Compare commits

..

37 Commits

Author SHA1 Message Date
you fba6c2c2e3 fix(ingestor): address review BLOCKERs from PR #926 (goroutine leak + guard semantics)
BLOCKER 1: Add client.Disconnect(0) on hard-failure error path to prevent
Paho retry goroutine leaks when ConnectRetry=true.

BLOCKER 2: Change fatal guard from len(clients)==0 to connectedCount==0.
Timed-out-but-retrying clients no longer mask zero-connected state.
Single unreachable broker correctly fatals instead of silently running.

Includes PR #926's WaitTimeout fix for #910, plus both blocker fixes.

Tests: TestBL1_GoroutineLeakOnHardFailure, TestBL2_ZeroConnectedFatals,
TestMQTTConnectRetryTimeoutDoesNotBlock
2026-05-02 18:19:44 +00:00
efiten 40c3aa13f9 fix(paths): exclude false-positive paths from short-prefix collisions (#930)
Fixes #929

## Summary

- `handleNodePaths` pulls candidates from `byPathHop` using 2-char and
4-char prefix keys (e.g. `"7a"` for a node using 1-byte adverts)
- When two nodes share the same short prefix, paths through the *other*
node are included as candidates
- The `resolved_path` post-filter covers decoded packets but falls
through conservatively (`inIndex = true`) when `resolved_path` is NULL,
letting false positives reach the response

**Fix:** during the aggregation phase (which already calls `resolveHop`
per hop), add a `containsTarget` check. If every hop resolves to a
different node's pubkey, skip the path. Packets confirmed via the
full-pubkey index key or via SQL bypass the check. Unresolvable hops are
kept conservatively.

## Test plan
- [x] `TestHandleNodePaths_PrefixCollisionExclusion`: two nodes sharing
`"7a"` prefix; verifies the path with no `resolved_path` (false
positive) is excluded and the SQL-confirmed path (true positive) is
included
- [x] Full test suite: `go test github.com/corescope/server` — all pass

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-02 11:15:25 -07:00
Kpa-clawbot b47587f031 feat(#690): expose observer skew + per-hash evidence in clock UI (#906)
## Summary

UI completion of #690 — surfaces observer clock skew and per-hash
evidence that the backend already computes but wasn't exposed in the
frontend.

**Not related to #845/PR #894** (bimodal detection) — this is the UI
surface for the original #690 scope.

## Changes

### Backend: per-hash evidence in node clock-skew API (commit 1)
- Extended `GET /api/nodes/{pubkey}/clock-skew` to return
`recentHashEvidence` (most recent 10 hashes with per-observer
raw/corrected skew and observer offset) and `calibrationSummary`
(total/calibrated/uncalibrated counts).
- Evidence is cached during `ClockSkewEngine.Recompute()` — route
handler is cheap.
- Fleet endpoint omits evidence to keep payload small.

### Frontend: observer list page — clock offset column (commit 2)
- Added "Clock Offset" column to observers table.
- Fetches `/api/observers/clock-skew` once on page load, joins by
ObserverID.
- Color-coded severity badge + sample count tooltip.
- Singleton observers show "—" not "0".

### Frontend: observer-detail clock card (commit 3)
- Added clock offset card mirroring node clock card style.
- Shows: offset value, sample count, severity badge.
- Inline explainer describing how offset is computed from multi-observer
packets.

### Frontend: node clock card evidence panel (commit 4)
- Collapsible "Evidence" section in existing node clock skew card.
- Per-hash breakdown: observer count, median corrected skew,
per-observer raw/corrected/offset.
- Calibration summary line and plain-English severity reason at top.

## Test Results

```
go test ./... (cmd/server) — PASS (19.3s)
go test ./... (cmd/ingestor) — PASS (31.6s)
Frontend helpers: 610 passed, 0 failed
```

New test: `TestNodeClockSkew_EvidencePayload` — 3-observer scenario
verifying per-hash array shape, corrected = raw + offset math, and
median.

No frontend JS smoke test added — no existing test harness for
clock/observer rendering. Noted for future.

## Screenshots

Screenshots TBD

## Perf justification

Evidence is computed inside the existing `Recompute()` cycle (already
O(n) on samples). The `hashEvidence` map adds ~32 bytes per sample of
memory. Evidence is stripped from fleet responses. Per-node endpoint
returns at most 10 evidence entries — bounded payload.

---------

Co-authored-by: you <you@example.com>
2026-05-02 10:30:54 -07:00
Kpa-clawbot c67f3347ce fix(ui): add GRP_DATA (type 6) to filter dropdown + color tables (#965)
## Bug

Packet type 6 (`PAYLOAD_TYPE_GRP_DATA` per `firmware/src/Packet.h:25`)
was missing from three frontend lookup tables:
- `public/app.js:7` — `PAYLOAD_COLORS` had no entry for 6 → badge color
fell back to `unknown` (grey)
- `public/packets.js:29` — `TYPE_NAMES` (used by the Packets page
type-filter dropdown) had no entry for 6 → "Group Data" missing from the
menu
- `public/roles.js:17,24` — `TYPE_COLORS` and `TYPE_BADGE_MAP` had no
`GRP_DATA` entry → no dedicated CSS class

The packet detail page already handled it (via `PAYLOAD_TYPES` in
`app.js:6` which had `6: 'Group Data'`) so individual GRP_DATA packets
render correctly. The gap was only in the filter UI + badge styling.

## Fix

Add the missing entry in each table. 4 lines across 3 files.

- `app.js`: add `6: 'grp-data'` to `PAYLOAD_COLORS`
- `packets.js`: add `6:'Group Data'` to `TYPE_NAMES`
- `roles.js`: add `GRP_DATA: '#8b5cf6'` to `TYPE_COLORS` and `GRP_DATA:
'grp-data'` to `TYPE_BADGE_MAP`

Color choice `#8b5cf6` (violet) — distinct from GRP_TXT's blue but
visually adjacent so operators read them as related types.

## Verification (rule 18 + 19)

Built server locally, served the JS files, grepped the rendered output:

```
$ curl -s http://localhost:13900/packets.js | grep TYPE_NAMES
const TYPE_NAMES = { ... 5:'Channel Msg', 6:'Group Data', 7:'Anon Req' ... };

$ curl -s http://localhost:13900/app.js | grep PAYLOAD_TYPES
const PAYLOAD_TYPES = { ... 5: 'Channel Msg', 6: 'Group Data', 7: 'Anon Req' ... };

$ curl -s http://localhost:13900/roles.js | grep GRP_DATA
ADVERT: '#22c55e', GRP_TXT: '#3b82f6', GRP_DATA: '#8b5cf6', ...
ADVERT: 'advert', GRP_TXT: 'grp-txt', GRP_DATA: 'grp-data', ...
```

Frontend tests pass: `test-packets.js` 82/82, `test-hash-color.js`
32/32.

## Out of scope

Consolidating the duplicated PAYLOAD_TYPES / TYPE_NAMES tables into a
single source of truth is a separate cleanup. Two parallel name maps
continues to be a footgun (this is the second time a new type's been
added to one but not the other).

Co-authored-by: Kpa-clawbot <bot@example.invalid>
2026-05-02 09:55:09 -07:00
Kpa-clawbot b3a9677c52 feat(ingestor + server): observerBlacklist config (#962) (#963)
## Summary

Implements `observerBlacklist` config — mirrors the existing
`nodeBlacklist` pattern for observers. Drop observers by pubkey at
ingest, with defense-in-depth filtering on the server side.

Closes #962

## Changes

### Ingestor (`cmd/ingestor/`)
- **`config.go`**: Added `ObserverBlacklist []string` field +
`IsObserverBlacklisted()` method (case-insensitive, whitespace-trimmed)
- **`main.go`**: Early return in `handleMessage` when `parts[2]`
(observer ID from MQTT topic) matches blacklist — before status
handling, before IATA filter. No UpsertObserver, no observations, no
metrics insert. Log line: `observer <pubkey-short> blacklisted,
dropping`

### Server (`cmd/server/`)
- **`config.go`**: Same `ObserverBlacklist` field +
`IsObserverBlacklisted()` with `sync.Once` cached set (same pattern as
`nodeBlacklist`)
- **`routes.go`**: Defense-in-depth filtering in `handleObservers` (skip
blacklisted in list) and `handleObserverDetail` (404 for blacklisted ID)
- **`main.go`**: Startup `softDeleteBlacklistedObservers()` marks
matching rows `inactive=1` so historical data is hidden
- **`neighbor_persist.go`**: `softDeleteBlacklistedObservers()`
implementation

### Tests
- `cmd/ingestor/observer_blacklist_test.go`: config method tests
(case-insensitive, empty, nil)
- `cmd/server/observer_blacklist_test.go`: config tests + HTTP handler
tests (list excludes blacklisted, detail returns 404, no-blacklist
passes all, concurrent safety)

## Config

```json
{
  "observerBlacklist": [
    "EE550DE547D7B94848A952C98F585881FCF946A128E72905E95517475F83CFB1"
  ]
}
```

## Verification (Rule 18 — actual server output)

**Before blacklist** (no config):
```
Total: 31
DUBLIN in list: True
```

**After blacklist** (DUBLIN Observer pubkey in `observerBlacklist`):
```
[observer-blacklist] soft-deleted 1 blacklisted observer(s)
Total: 30
DUBLIN in list: False
```

Detail endpoint for blacklisted observer returns **404**.

All existing tests pass (`go test ./...` for both server and ingestor).

---------

Co-authored-by: you <you@example.com>
2026-05-01 23:11:27 -07:00
Kpa-clawbot 707228ad91 ci: update go-server-coverage.json [skip ci] 2026-05-02 02:14:13 +00:00
Kpa-clawbot 8d379baf5e ci: update go-ingestor-coverage.json [skip ci] 2026-05-02 02:14:12 +00:00
Kpa-clawbot 3b436c768b ci: update frontend-tests.json [skip ci] 2026-05-02 02:14:11 +00:00
Kpa-clawbot 6d49cf939c ci: update frontend-coverage.json [skip ci] 2026-05-02 02:14:09 +00:00
Kpa-clawbot 8d39b33111 ci: update e2e-tests.json [skip ci] 2026-05-02 02:14:08 +00:00
Kpa-clawbot e1a1be1735 fix(server): add observers.inactive column at startup if missing (root cause of CI flake) (#961)
## The actual root cause

PR #954 added `WHERE inactive IS NULL OR inactive = 0` to the server's
observer queries, but the `inactive` column is only added by the
**ingestor** migration (`cmd/ingestor/db.go:344-354`). When the server
runs against a DB the ingestor never touched (e.g. the e2e fixture), the
column doesn't exist:

```
$ sqlite3 test-fixtures/e2e-fixture.db "SELECT COUNT(*) FROM observers WHERE inactive IS NULL OR inactive = 0;"
Error: no such column: inactive
```

The server's `db.QueryRow().Scan()` swallows that error →
`totalObservers` stays 0 → `/api/observers` returns empty → map test
fails with "No map markers/overlays found".

This explains all the failing CI runs since #954 merged. PR #957
(freshen fixture) helped with the `nodes` time-rot but couldn't fix the
missing-column problem. PR #960 (freshen observers) added the right
timestamps but the column was still missing. PR #959 (data-loaded in
finally) fixed a different real bug. None of those touched the actual
mechanism.

## Fix

Mirror the existing `ensureResolvedPathColumn` pattern: add
`ensureObserverInactiveColumn` that runs at server startup, checks if
the column exists via `PRAGMA table_info`, adds it with `ALTER TABLE
observers ADD COLUMN inactive INTEGER DEFAULT 0` if missing.

Wired into `cmd/server/main.go` immediately after
`ensureResolvedPathColumn`.

## Verification

End-to-end on a freshened fixture:

```
$ sqlite3 /tmp/e2e-verify.db "PRAGMA table_info(observers);" | grep inactive
(no output — column absent)

$ ./cs-fixed -port 13702 -db /tmp/e2e-verify.db -public public &
[store] Added inactive column to observers

$ curl 'http://localhost:13702/api/observers'
returned=31    # was 0 before fix
```

`go test ./...` passes (19.8s).

## Lessons

I should have run `sqlite3 fixture "SELECT ... WHERE inactive ..."`
directly the first time the map test failed after #954 instead of
writing four "fix" PRs that didn't address the actual mechanism.
Apologies for the wild goose chase.

Co-authored-by: Kpa-clawbot <bot@example.invalid>
2026-05-01 19:04:23 -07:00
Kpa-clawbot b97fe5758c fix(ci): freshen observer timestamps so RemoveStaleObservers doesn't prune them on startup (#960)
## Bug

Master CI failing on `Map page loads with markers: No map
markers/overlays found` since #954 (observer filter) merged.

## Root cause chain

1. Fixture has 31 observers, all dated `2026-03-26` to `2026-03-29` (33+
days old)
2. PR #957's `tools/freshen-fixture.sh` shifts `nodes`, `transmissions`,
`neighbor_edges` timestamps but NOT `observers.last_seen`
3. Server startup runs `RemoveStaleObservers(14)` per
`cmd/server/main.go:382` — marks all 33-day-old observers `inactive=1`
4. PR #954's `GetObservers` filter then excludes them
5. `/api/observers` returns 0 → map has no observer markers → test
asserts >0 → fails

Server log line confirms: `[db] transmissions=499 observations=500
nodes=200 observers=0`

## Fix

Extend `freshen-fixture.sh` to also shift `observers.last_seen` (same
algorithm — preserve relative ordering, max anchored to now). Also
defensively clear any stale `inactive=1` flags from prior failed runs.
The `inactive` column may not exist on a fresh fixture (server adds via
migration); script silently no-ops if column absent.

## Verification

```
$ bash tools/freshen-fixture.sh /tmp/test.db
nodes: min=2026-05-01T11:07:29Z max=2026-05-01T18:49:02Z
observers: count=31 max=2026-05-01T18:49:02Z
```

After: 31 observers, oldest 3 days old, within the 14d retention window.
Server's startup prune won't touch them.

Co-authored-by: Kpa-clawbot <bot@example.invalid>
2026-05-01 16:55:25 -07:00
Kpa-clawbot 568de4b441 fix(observers): exclude soft-deleted observers from /api/observers and totalObservers (#954)
## Bug

`/api/observers` returned soft-deleted (inactive=1) observers. Operators
saw stale observers in the UI even after the auto-prune marked them
inactive on schedule. Reproduced on staging: 14 observers older than 14
days returned by the API; all of them had `inactive=1` in the DB.

## Root cause

`DB.GetObservers()` (`cmd/server/db.go:974`) ran `SELECT ... FROM
observers ORDER BY last_seen DESC` with no WHERE filter. The
`RemoveStaleObservers` path correctly soft-deletes by setting
`inactive=1`, but the read path didn't honor it.

`statsRow` (`cmd/server/db.go:234`) had the same bug — `totalObservers`
count included soft-deleted rows.

## Fix

Add `WHERE inactive IS NULL OR inactive = 0` to both:

```go
// GetObservers
"SELECT ... FROM observers WHERE inactive IS NULL OR inactive = 0 ORDER BY last_seen DESC"

// statsRow.TotalObservers
"SELECT COUNT(*) FROM observers WHERE inactive IS NULL OR inactive = 0"
```

`NULL` check preserves backward compatibility with rows from before the
`inactive` migration.

## Tests

Added regression `TestGetObservers_ExcludesInactive`:
- Seed two observers, mark one inactive, assert `GetObservers()` returns
only the other.
- **Anti-tautology gate verified**: reverting the WHERE clause causes
the test to fail with `expected 1 observer, got 2` and `inactive
observer obs2 should be excluded`.

`go test ./...` passes (19.6s).

## Out of scope

- `GetObserverByID` lookup at line 1009 still returns inactive observers
— this is intentional, so an old deep link to `/observers/<id>` shows
"inactive" rather than 404.
- Frontend may also have its own caching layer; this fix is server-side
only.

---------

Co-authored-by: Kpa-clawbot <bot@example.invalid>
Co-authored-by: you <you@example.com>
Co-authored-by: KpaBap <kpabap@gmail.com>
2026-05-01 17:51:08 +00:00
Kpa-clawbot 04c8558768 fix(spa): data-loaded setAttribute in finally so it fires on errors (#959)
## Bug

PR #958 added `data-loaded="true"` attributes for E2E sync, but placed
the `setAttribute` call inside the `try` block of `loadNodes()` /
`loadPackets()` / `loadNodes()` (map). When the API call failed (e.g.
`/api/observers` returns 500, or any other exception), the `catch`
swallowed the error and `setAttribute` was never reached. E2E tests then
waited 15s for `[data-loaded="true"]` and timed out.

This blocked PR #954 CI repeatedly with `Map page loads with markers:
page.waitForSelector: Timeout 15000ms exceeded`.

## Fix

Move `setAttribute('data-loaded', 'true')` to a `finally` block in all
three handlers (`map.js`, `nodes.js`, `packets.js`). The attribute now
fires on both success and error paths, so E2E tests proceed (test still
asserts on the actual rendered state — markers, rows, etc — so an empty
page still fails the right assertion, just much faster).

Removed the duplicate setAttribute calls inside the try blocks (the
finally is the single source of truth now).

## Verification

- `node test-packets.js` 82/82 
- `node test-hash-color.js` 32/32 
- Code reading: each `finally` runs after either success or catch, sets
the same attribute on the same container element.

## Why CI didn't catch this on #958

The PR #958 tests passed because the staging fixture happened to load
successfully when those tests ran. The flake only manifests when an
upstream fetch fails (e.g. observer API returning unexpected shape,
network blip, server still warming).

Co-authored-by: Kpa-clawbot <bot@example.invalid>
2026-05-01 10:49:21 -07:00
Kpa-clawbot 52b5ae86d6 ci: update go-server-coverage.json [skip ci] 2026-05-01 15:17:33 +00:00
Kpa-clawbot 8397f2bb1c ci: update go-ingestor-coverage.json [skip ci] 2026-05-01 15:17:32 +00:00
Kpa-clawbot ed65498281 ci: update frontend-tests.json [skip ci] 2026-05-01 15:17:31 +00:00
Kpa-clawbot c53af5cf66 ci: update frontend-coverage.json [skip ci] 2026-05-01 15:17:30 +00:00
Kpa-clawbot 9f606600e2 ci: update e2e-tests.json [skip ci] 2026-05-01 15:17:28 +00:00
Kpa-clawbot 053aef1994 fix(spa): decouple navigate() from theme fetch + add data-loaded sync attributes (#955) (#958)
## Summary

Fixes the chained async init race identified in RCA #3 of #955.

`navigate()` (which dispatches page handlers and fetches data) was gated
behind `/api/config/theme` resolving via `.finally()`. Tests use
`waitUntil: 'domcontentloaded'` which returns BEFORE theme fetch
resolves, creating a race condition where 3+ serial network requests
must complete before any DOM rows appear.

## Changes

### Decouple navigate() from theme fetch (public/app.js)
- Move `navigate()` call out of the theme fetch `.finally()` block
- Call it immediately on DOMContentLoaded — theme is purely cosmetic and
applies in parallel

### Add data-loaded sync attributes (public/nodes.js, map.js,
packets.js)
- Set `data-loaded="true"` on the container element after each page's
data fetch resolves and DOM renders
- Nodes: set on `#nodesLeft` after `loadNodes()` renders rows
- Map: set on `#leaflet-map` after `renderMarkers()` completes
- Packets: set on `#pktLeft` after `loadPackets()` renders rows

### Update E2E tests (test-e2e-playwright.js)
- Add `await page.waitForSelector('[data-loaded="true"]', { timeout:
15000 })` before row/marker assertions
- Increase map marker timeout from 3s to 8s as additional safety margin
- Tests now synchronize on data readiness rather than racing DOM
appearance

## Verification

- Spun up local server on port 13586 with e2e-fixture.db
- Confirmed navigate() is called immediately (not gated on theme)
- Confirmed data-loaded attributes are present in served JS
- API returns data correctly (2 nodes from fixture)

Closes #955 (RCA #3)

Co-authored-by: you <you@example.com>
2026-05-01 08:07:08 -07:00
Kpa-clawbot 7aef3c355c fix(ci): freshen fixture timestamps before E2E to avoid time-based filter exclusion (#955) (#957)
## Problem

The E2E fixture DB (`test-fixtures/e2e-fixture.db`) has static
timestamps from March 29, 2026. The map page applies a default
`lastHeard=30d` filter, so once the fixture ages past 30 days all nodes
are excluded from `/api/nodes?lastHeard=30d` — causing the "Map page
loads with markers" test to fail deterministically.

This started blocking all CI on ~April 28, 2026 (30 days after March
29).

Closes #955 (RCA #1: time-based fixture rot)

## Fix

Added `tools/freshen-fixture.sh` — a small script that shifts all
`last_seen`/`first_seen` timestamps forward so the newest is near
`now()`, preserving relative ordering between nodes. Runs in CI before
the Go server starts. Does **not** modify the checked-in fixture (no
binary blob churn).

## Verification

```
$ cp test-fixtures/e2e-fixture.db /tmp/fix4.db
$ bash tools/freshen-fixture.sh /tmp/fix4.db
Fixture timestamps freshened in /tmp/fix4.db
nodes: min=2026-05-01T07:10:00Z max=2026-05-01T14:51:33Z

$ ./corescope-server -port 13585 -db /tmp/fix4.db -public public &
$ curl -s "http://localhost:13585/api/nodes?limit=200&lastHeard=30d" | jq '{total, count: (.nodes | length)}'
{
  "total": 200,
  "count": 200
}
```

All 200 nodes returned with the 30-day filter after freshening (vs 0
without the fix).

Co-authored-by: you <you@example.com>
2026-05-01 08:06:19 -07:00
Kpa-clawbot 9ac484607f ci: update go-server-coverage.json [skip ci] 2026-05-01 15:05:20 +00:00
Kpa-clawbot b562de32ff ci: update go-ingestor-coverage.json [skip ci] 2026-05-01 15:05:19 +00:00
Kpa-clawbot 6f0c58c94a ci: update frontend-tests.json [skip ci] 2026-05-01 15:05:18 +00:00
Kpa-clawbot 7d1c679f4f ci: update frontend-coverage.json [skip ci] 2026-05-01 15:05:17 +00:00
Kpa-clawbot ead08c721d ci: update e2e-tests.json [skip ci] 2026-05-01 15:05:16 +00:00
Kpa-clawbot 57e272494d feat(server): /api/healthz readiness endpoint gated on store load (#955) (#956)
## Summary

Fixes RCA #2 from #955: the HTTP listener and `/api/stats` go live
before background goroutines (pickBestObservation, neighbor graph build)
finish, causing CI readiness checks to pass prematurely.

## Changes

1. **`cmd/server/healthz.go`** — New `GET /api/healthz` endpoint:
- Returns `503 {"ready":false,"reason":"loading"}` while background init
is running
   - Returns `200 {"ready":true,"loadedTx":N,"loadedObs":N}` once ready

2. **`cmd/server/main.go`** — Added `sync.WaitGroup` tracking
pickBestObservation and neighbor graph build goroutines. A coordinator
goroutine sets `readiness.Store(1)` when all complete.
`backfillResolvedPathsAsync` is NOT gated (async by design, can take 20+
min).

3. **`cmd/server/routes.go`** — Wired `/api/healthz` before system
endpoints.

4. **`.github/workflows/deploy.yml`** — CI wait-for-ready loop now polls
`/api/healthz` instead of `/api/stats`.

5. **`cmd/server/healthz_test.go`** — Tests for 503-before-ready,
200-after-ready, JSON shape, and anti-tautology gate.

## Rule 18 Verification

Built and ran against `test-fixtures/e2e-fixture.db` (499 tx):
- With the small fixture DB, init completes in <300ms so both immediate
and delayed curls return 200
- Unit tests confirm 503 behavior when `readiness=0` (simulating slow
init)
- On production DBs with 100K+ txs, the 503 window would be 5-15s
(pickBestObservation processes in 5000-tx chunks with 10ms yields)

## Test Results

```
=== RUN   TestHealthzNotReady    --- PASS
=== RUN   TestHealthzReady       --- PASS  
=== RUN   TestHealthzAntiTautology --- PASS
ok  github.com/corescope/server  19.662s (full suite)
```

Co-authored-by: you <you@example.com>
2026-05-01 07:55:57 -07:00
Kpa-clawbot d870a693d0 ci: update go-server-coverage.json [skip ci] 2026-05-01 09:36:22 +00:00
Kpa-clawbot d9904cc138 ci: update go-ingestor-coverage.json [skip ci] 2026-05-01 09:36:21 +00:00
Kpa-clawbot 7aa59eabde ci: update frontend-tests.json [skip ci] 2026-05-01 09:36:20 +00:00
Kpa-clawbot ac7d2b64f7 ci: update frontend-coverage.json [skip ci] 2026-05-01 09:36:19 +00:00
Kpa-clawbot fd3bf1a892 ci: update e2e-tests.json [skip ci] 2026-05-01 09:36:18 +00:00
Kpa-clawbot f16afe7fdf ci: update go-server-coverage.json [skip ci] 2026-05-01 09:34:29 +00:00
Kpa-clawbot ed66e54e57 ci: update go-ingestor-coverage.json [skip ci] 2026-05-01 09:34:28 +00:00
Kpa-clawbot 22079a1fc4 ci: update frontend-tests.json [skip ci] 2026-05-01 09:34:27 +00:00
Kpa-clawbot 232882d308 ci: update frontend-coverage.json [skip ci] 2026-05-01 09:34:25 +00:00
Kpa-clawbot fb640bcfc3 ci: update e2e-tests.json [skip ci] 2026-05-01 09:34:25 +00:00
28 changed files with 1186 additions and 40 deletions
+1 -1
View File
@@ -1 +1 @@
{"schemaVersion":1,"label":"frontend coverage","message":"36.12%","color":"red"}
{"schemaVersion":1,"label":"frontend coverage","message":"40.21%","color":"red"}
+4 -1
View File
@@ -176,6 +176,9 @@ jobs:
- name: Instrument frontend JS for coverage
run: sh scripts/instrument-frontend.sh
- name: Freshen fixture timestamps
run: bash tools/freshen-fixture.sh test-fixtures/e2e-fixture.db
- name: Start Go server with fixture DB
run: |
fuser -k 13581/tcp 2>/dev/null || true
@@ -183,7 +186,7 @@ jobs:
./corescope-server -port 13581 -db test-fixtures/e2e-fixture.db -public public-instrumented &
echo $! > .server.pid
for i in $(seq 1 30); do
if curl -sf http://localhost:13581/api/stats > /dev/null 2>&1; then
if curl -sf http://localhost:13581/api/healthz > /dev/null 2>&1; then
echo "Server ready after ${i}s"
break
fi
+28
View File
@@ -7,6 +7,7 @@ import (
"log"
"os"
"strings"
"sync"
"github.com/meshcore-analyzer/geofilter"
)
@@ -42,6 +43,15 @@ type Config struct {
GeoFilter *GeoFilterConfig `json:"geo_filter,omitempty"`
ValidateSignatures *bool `json:"validateSignatures,omitempty"`
DB *DBConfig `json:"db,omitempty"`
// ObserverBlacklist is a list of observer public keys to drop at ingest.
// Messages from blacklisted observers are silently discarded — no DB writes,
// no UpsertObserver, no observations, no metrics.
ObserverBlacklist []string `json:"observerBlacklist,omitempty"`
// obsBlacklistSetCached is the lazily-built lowercase set for O(1) lookups.
obsBlacklistSetCached map[string]bool
obsBlacklistOnce sync.Once
}
// GeoFilterConfig is an alias for the shared geofilter.Config type.
@@ -114,6 +124,24 @@ func (c *Config) ObserverDaysOrDefault() int {
return 14
}
// IsObserverBlacklisted returns true if the given observer ID is in the observerBlacklist.
func (c *Config) IsObserverBlacklisted(id string) bool {
if c == nil || len(c.ObserverBlacklist) == 0 {
return false
}
c.obsBlacklistOnce.Do(func() {
m := make(map[string]bool, len(c.ObserverBlacklist))
for _, pk := range c.ObserverBlacklist {
trimmed := strings.ToLower(strings.TrimSpace(pk))
if trimmed != "" {
m[trimmed] = true
}
}
c.obsBlacklistSetCached = m
})
return c.obsBlacklistSetCached[strings.ToLower(strings.TrimSpace(id))]
}
// LoadConfig reads configuration from a JSON file, with env var overrides.
// If the config file does not exist, sensible defaults are used (zero-config startup).
func LoadConfig(path string) (*Config, error) {
+38 -6
View File
@@ -123,6 +123,7 @@ func main() {
// Connect to each MQTT source
var clients []mqtt.Client
connectedCount := 0
for _, source := range sources {
tag := source.Name
if tag == "" {
@@ -164,19 +165,43 @@ func main() {
client := mqtt.NewClient(opts)
token := client.Connect()
token.Wait()
if token.Error() != nil {
log.Printf("MQTT [%s] connection failed (non-fatal): %v", tag, token.Error())
// With ConnectRetry=true, token.Wait() blocks forever for unreachable brokers.
// WaitTimeout lets startup proceed; the client keeps retrying in the background
// and OnConnect fires (subscribing) when it eventually connects (#910).
if !token.WaitTimeout(30 * time.Second) {
log.Printf("MQTT [%s] initial connection timed out — retrying in background", tag)
clients = append(clients, client)
continue
}
if token.Error() != nil {
log.Printf("MQTT [%s] connection failed (non-fatal): %v", tag, token.Error())
// BL1 fix: Disconnect to stop Paho's internal retry goroutines.
// With ConnectRetry=true, Connect() spawns background goroutines
// that leak if the client is simply discarded.
client.Disconnect(0)
continue
}
connectedCount++
clients = append(clients, client)
}
if len(clients) == 0 {
log.Fatal("no MQTT connections established — check broker is running (default: mqtt://localhost:1883). Set MQTT_BROKER env var or configure mqttSources in config.json")
// BL2 fix: require at least one immediately-connected source. Timed-out
// clients are retrying in background (tracked in clients) but don't count
// as "connected" — a single unreachable broker must not silently run with
// zero active connections.
if connectedCount == 0 {
// Clean up any timed-out clients still retrying
for _, c := range clients {
c.Disconnect(0)
}
log.Fatal("no MQTT sources connected — all timed out or failed. Check broker is running (default: mqtt://localhost:1883). Set MQTT_BROKER env var or configure mqttSources in config.json")
}
log.Printf("Running — %d MQTT source(s) connected", len(clients))
if connectedCount < len(clients) {
log.Printf("Running — %d MQTT source(s) connected, %d retrying in background", connectedCount, len(clients)-connectedCount)
} else {
log.Printf("Running — %d MQTT source(s) connected", connectedCount)
}
// Wait for shutdown signal
sig := make(chan os.Signal, 1)
@@ -240,6 +265,13 @@ func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message,
return
}
// Observer blacklist: drop ALL messages from blacklisted observers before any
// DB writes (status, metrics, packets). Trumps IATA filter.
if len(parts) > 2 && cfg.IsObserverBlacklisted(parts[2]) {
log.Printf("MQTT [%s] observer %.8s blacklisted, dropping", tag, parts[2])
return
}
// Status topic: meshcore/<region>/<observer_id>/status
// IATA filter does NOT apply here — observer metadata (noise_floor, battery, etc.)
// is region-independent and should be accepted from all observers regardless of
+124
View File
@@ -5,8 +5,11 @@ import (
"math"
"os"
"path/filepath"
"runtime"
"testing"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
func TestToFloat64(t *testing.T) {
@@ -780,3 +783,124 @@ func TestIATAFilterDoesNotDropStatusMessages(t *testing.T) {
t.Error("packet from out-of-region BFL should still be filtered by IATA")
}
}
// TestMQTTConnectRetryTimeoutDoesNotBlock verifies that WaitTimeout returns within
// the deadline for an unreachable broker when ConnectRetry=true (#910). Previously,
// token.Wait() would block forever in this configuration.
func TestMQTTConnectRetryTimeoutDoesNotBlock(t *testing.T) {
opts := mqtt.NewClientOptions().
AddBroker("tcp://127.0.0.1:1"). // port 1 — nothing listening
SetConnectRetry(true).
SetAutoReconnect(true)
client := mqtt.NewClient(opts)
token := client.Connect()
defer client.Disconnect(100)
start := time.Now()
connected := token.WaitTimeout(3 * time.Second)
elapsed := time.Since(start)
if connected {
t.Skip("port 1 unexpectedly accepted a connection — skipping")
}
if elapsed > 4*time.Second {
t.Errorf("WaitTimeout blocked for %v — token.Wait() would block forever with ConnectRetry=true", elapsed)
}
}
// TestBL1_GoroutineLeakOnHardFailure reproduces BLOCKER 1: without Disconnect()
// on the error path, Paho's internal retry goroutines leak when a client is
// discarded after Connect() with ConnectRetry=true.
//
// We prove the leak by creating N clients WITHOUT Disconnect — goroutines grow
// proportionally. The fix (client.Disconnect(0) before continue) prevents this.
func TestBL1_GoroutineLeakOnHardFailure(t *testing.T) {
runtime.GC()
time.Sleep(100 * time.Millisecond)
baseline := runtime.NumGoroutine()
// Create multiple clients connected to unreachable broker, WITHOUT disconnecting.
// Each one spawns Paho retry goroutines that accumulate.
const numClients = 10
clients := make([]mqtt.Client, numClients)
for i := 0; i < numClients; i++ {
opts := mqtt.NewClientOptions().
AddBroker("tcp://127.0.0.1:1").
SetConnectRetry(true).
SetAutoReconnect(true).
SetConnectTimeout(500 * time.Millisecond)
c := mqtt.NewClient(opts)
tok := c.Connect()
tok.WaitTimeout(1 * time.Second)
clients[i] = c
}
time.Sleep(200 * time.Millisecond)
leaked := runtime.NumGoroutine()
goroutineGrowth := leaked - baseline
// Clean up to not actually leak in test
for _, c := range clients {
c.Disconnect(0)
}
t.Logf("baseline=%d, after %d undisconnected clients=%d, growth=%d",
baseline, numClients, leaked, goroutineGrowth)
// With ConnectRetry=true, each Connect() spawns retry goroutines.
// Without Disconnect, these accumulate. Verify growth is meaningful.
if goroutineGrowth < 3 {
t.Skip("Connect didn't spawn enough extra goroutines to measure leak")
}
// The fix: calling client.Disconnect(0) on the error path prevents accumulation.
// Anti-tautology: removing the Disconnect(0) call from main.go's error path
// would cause goroutine accumulation proportional to failed broker count.
t.Logf("CONFIRMED: %d leaked goroutines from %d clients without Disconnect — fix adds Disconnect(0) on error path", goroutineGrowth, numClients)
}
// TestBL2_ZeroConnectedFatals verifies BLOCKER 2: when all brokers are unreachable,
// connectedCount==0 must be detected. We test the logic directly — if only timed-out
// clients exist (appended to clients slice) but connectedCount is 0, the guard triggers.
func TestBL2_ZeroConnectedFatals(t *testing.T) {
// Simulate the connection loop result: 1 timed-out client, 0 connected
var clients []mqtt.Client
connectedCount := 0
// Create a client that times out (unreachable broker)
opts := mqtt.NewClientOptions().
AddBroker("tcp://127.0.0.1:1").
SetConnectRetry(true).
SetAutoReconnect(true)
client := mqtt.NewClient(opts)
token := client.Connect()
if !token.WaitTimeout(2 * time.Second) {
// Timed out — PR #926 appends to clients
clients = append(clients, client)
}
defer func() {
for _, c := range clients {
c.Disconnect(0)
}
}()
// OLD bug: len(clients) == 0 would be false (1 timed-out client in list)
// → ingestor would silently run with zero connections
if len(clients) == 0 {
t.Fatal("expected timed-out client to be in clients slice")
}
// NEW fix: connectedCount == 0 catches this
if connectedCount != 0 {
t.Errorf("connectedCount should be 0, got %d", connectedCount)
}
// The real code does: if connectedCount == 0 { log.Fatal(...) }
// This test proves len(clients) > 0 but connectedCount == 0 — the old guard
// would have missed it.
if len(clients) > 0 && connectedCount == 0 {
t.Log("BL2 confirmed: old guard len(clients)==0 would NOT fatal; new guard connectedCount==0 correctly catches zero-connected state")
}
}
+43
View File
@@ -0,0 +1,43 @@
package main
import (
"testing"
)
func TestIngestorIsObserverBlacklisted(t *testing.T) {
cfg := &Config{
ObserverBlacklist: []string{"OBS1", "obs2"},
}
tests := []struct {
id string
want bool
}{
{"OBS1", true},
{"obs1", true},
{"OBS2", true},
{"obs3", false},
{"", false},
}
for _, tt := range tests {
got := cfg.IsObserverBlacklisted(tt.id)
if got != tt.want {
t.Errorf("IsObserverBlacklisted(%q) = %v, want %v", tt.id, got, tt.want)
}
}
}
func TestIngestorIsObserverBlacklistedEmpty(t *testing.T) {
cfg := &Config{}
if cfg.IsObserverBlacklisted("anything") {
t.Error("empty blacklist should not match")
}
}
func TestIngestorIsObserverBlacklistedNil(t *testing.T) {
var cfg *Config
if cfg.IsObserverBlacklisted("anything") {
t.Error("nil config should not match")
}
}
+123 -4
View File
@@ -120,6 +120,8 @@ type NodeClockSkew struct {
GoodFraction float64 `json:"goodFraction"` // fraction of recent samples with |skew| <= 1h
RecentBadSampleCount int `json:"recentBadSampleCount"` // count of recent samples with |skew| > 1h
RecentSampleCount int `json:"recentSampleCount"` // total recent samples in window
RecentHashEvidence []HashEvidence `json:"recentHashEvidence,omitempty"`
CalibrationSummary *CalibrationSummary `json:"calibrationSummary,omitempty"`
NodeName string `json:"nodeName,omitempty"` // populated in fleet responses
NodeRole string `json:"nodeRole,omitempty"` // populated in fleet responses
}
@@ -130,6 +132,31 @@ type SkewSample struct {
SkewSec float64 `json:"skew"` // corrected skew in seconds
}
// HashEvidenceObserver is one observer's contribution to a per-hash evidence entry.
type HashEvidenceObserver struct {
ObserverID string `json:"observerID"`
ObserverName string `json:"observerName"`
RawSkewSec float64 `json:"rawSkewSec"`
CorrectedSkewSec float64 `json:"correctedSkewSec"`
ObserverOffsetSec float64 `json:"observerOffsetSec"`
Calibrated bool `json:"calibrated"`
}
// HashEvidence is per-hash clock skew evidence showing individual observer contributions.
type HashEvidence struct {
Hash string `json:"hash"`
Observers []HashEvidenceObserver `json:"observers"`
MedianCorrectedSkewSec float64 `json:"medianCorrectedSkewSec"`
Timestamp int64 `json:"timestamp"`
}
// CalibrationSummary counts how many samples were corrected via observer calibration.
type CalibrationSummary struct {
TotalSamples int `json:"totalSamples"`
CalibratedSamples int `json:"calibratedSamples"`
UncalibratedSamples int `json:"uncalibratedSamples"`
}
// txSkewResult maps tx hash → per-transmission skew stats. This is an
// intermediate result keyed by hash (not pubkey); the store maps hash → pubkey
// when building the final per-node view.
@@ -143,15 +170,27 @@ type ClockSkewEngine struct {
observerOffsets map[string]float64 // observerID → calibrated offset (seconds)
observerSamples map[string]int // observerID → number of multi-observer packets used
nodeSkew txSkewResult
hashEvidence map[string][]hashEvidenceEntry // hash → per-observer raw/corrected data
lastComputed time.Time
computeInterval time.Duration
}
// hashEvidenceEntry stores raw evidence per observer per hash, cached during Recompute.
type hashEvidenceEntry struct {
observerID string
rawSkew float64
corrected float64
offset float64
calibrated bool
observedTS int64
}
func NewClockSkewEngine() *ClockSkewEngine {
return &ClockSkewEngine{
observerOffsets: make(map[string]float64),
observerSamples: make(map[string]int),
nodeSkew: make(txSkewResult),
hashEvidence: make(map[string][]hashEvidenceEntry),
computeInterval: 30 * time.Second,
}
}
@@ -176,14 +215,16 @@ func (e *ClockSkewEngine) Recompute(store *PacketStore) {
var newOffsets map[string]float64
var newSamples map[string]int
var newNodeSkew txSkewResult
var newHashEvidence map[string][]hashEvidenceEntry
if len(samples) > 0 {
newOffsets, newSamples = calibrateObservers(samples)
newNodeSkew = computeNodeSkew(samples, newOffsets)
newNodeSkew, newHashEvidence = computeNodeSkew(samples, newOffsets)
} else {
newOffsets = make(map[string]float64)
newSamples = make(map[string]int)
newNodeSkew = make(txSkewResult)
newHashEvidence = make(map[string][]hashEvidenceEntry)
}
// Swap results under brief write lock.
@@ -196,6 +237,7 @@ func (e *ClockSkewEngine) Recompute(store *PacketStore) {
e.observerOffsets = newOffsets
e.observerSamples = newSamples
e.nodeSkew = newNodeSkew
e.hashEvidence = newHashEvidence
e.lastComputed = time.Now()
e.mu.Unlock()
}
@@ -332,7 +374,7 @@ func calibrateObservers(samples []skewSample) (map[string]float64, map[string]in
// ── Phase 3: Per-Node Skew ─────────────────────────────────────────────────────
// computeNodeSkew calculates corrected skew statistics for each node.
func computeNodeSkew(samples []skewSample, obsOffsets map[string]float64) txSkewResult {
func computeNodeSkew(samples []skewSample, obsOffsets map[string]float64) (txSkewResult, map[string][]hashEvidenceEntry) {
// Compute corrected skew per sample, grouped by hash (each hash = one
// node's advert transmission). The caller maps hash → pubkey via byNode.
type correctedSample struct {
@@ -343,6 +385,7 @@ func computeNodeSkew(samples []skewSample, obsOffsets map[string]float64) txSkew
byHash := make(map[string][]correctedSample)
hashAdvertTS := make(map[string]int64)
evidence := make(map[string][]hashEvidenceEntry) // hash → per-observer evidence
for _, s := range samples {
obsOffset, hasCal := obsOffsets[s.observerID]
@@ -359,6 +402,14 @@ func computeNodeSkew(samples []skewSample, obsOffsets map[string]float64) txSkew
calibrated: hasCal,
})
hashAdvertTS[s.hash] = s.advertTS
evidence[s.hash] = append(evidence[s.hash], hashEvidenceEntry{
observerID: s.observerID,
rawSkew: round(rawSkew, 1),
corrected: round(corrected, 1),
offset: round(obsOffset, 1),
calibrated: hasCal,
observedTS: s.observedTS,
})
}
// Each hash represents one advert from one node. Compute median corrected
@@ -397,7 +448,7 @@ func computeNodeSkew(samples []skewSample, obsOffsets map[string]float64) txSkew
LastObservedTS: latestObsTS,
}
}
return result
return result, evidence
}
// ── Integration with PacketStore ───────────────────────────────────────────────
@@ -558,6 +609,70 @@ func (s *PacketStore) getNodeClockSkewLocked(pubkey string) *NodeClockSkew {
samples[i] = SkewSample{Timestamp: p.ts, SkewSec: round(p.skew, 1)}
}
// Build per-hash evidence (most recent 10 hashes with ≥1 observer).
// Observer name lookup from store observations.
obsNameMap := make(map[string]string)
type hashMeta struct {
hash string
ts int64
}
var evidenceHashes []hashMeta
for _, tx := range txs {
if tx.PayloadType == nil || *tx.PayloadType != PayloadADVERT {
continue
}
ev, ok := s.clockSkew.hashEvidence[tx.Hash]
if !ok || len(ev) == 0 {
continue
}
// Collect observer names from tx observations.
for _, obs := range tx.Observations {
if obs.ObserverID != "" && obs.ObserverName != "" {
obsNameMap[obs.ObserverID] = obs.ObserverName
}
}
evidenceHashes = append(evidenceHashes, hashMeta{hash: tx.Hash, ts: ev[0].observedTS})
}
// Sort by timestamp descending, take most recent 10.
sort.Slice(evidenceHashes, func(i, j int) bool { return evidenceHashes[i].ts > evidenceHashes[j].ts })
if len(evidenceHashes) > 10 {
evidenceHashes = evidenceHashes[:10]
}
var recentEvidence []HashEvidence
var calSummary CalibrationSummary
for _, eh := range evidenceHashes {
entries := s.clockSkew.hashEvidence[eh.hash]
var observers []HashEvidenceObserver
var corrSkews []float64
for _, e := range entries {
name := obsNameMap[e.observerID]
if name == "" {
name = e.observerID
}
observers = append(observers, HashEvidenceObserver{
ObserverID: e.observerID,
ObserverName: name,
RawSkewSec: e.rawSkew,
CorrectedSkewSec: e.corrected,
ObserverOffsetSec: e.offset,
Calibrated: e.calibrated,
})
corrSkews = append(corrSkews, e.corrected)
calSummary.TotalSamples++
if e.calibrated {
calSummary.CalibratedSamples++
} else {
calSummary.UncalibratedSamples++
}
}
recentEvidence = append(recentEvidence, HashEvidence{
Hash: eh.hash,
Observers: observers,
MedianCorrectedSkewSec: round(median(corrSkews), 1),
Timestamp: eh.ts,
})
}
return &NodeClockSkew{
Pubkey: pubkey,
MeanSkewSec: round(meanSkew, 1),
@@ -574,6 +689,8 @@ func (s *PacketStore) getNodeClockSkewLocked(pubkey string) *NodeClockSkew {
GoodFraction: round(goodFraction, 2),
RecentBadSampleCount: recentBadCount,
RecentSampleCount: recentSampleCount,
RecentHashEvidence: recentEvidence,
CalibrationSummary: &calSummary,
}
}
@@ -601,8 +718,10 @@ func (s *PacketStore) GetFleetClockSkew() []*NodeClockSkew {
cs.NodeName = ni.Name
cs.NodeRole = ni.Role
}
// Omit samples in fleet response (too much data).
// Omit samples and evidence in fleet response (too much data).
cs.Samples = nil
cs.RecentHashEvidence = nil
cs.CalibrationSummary = nil
results = append(results, cs)
}
return results
+103 -2
View File
@@ -191,7 +191,7 @@ func TestComputeNodeSkew_BasicCorrection(t *testing.T) {
// So the median approach finds obs2 is +5 ahead (relative to median)
// Now compute node skew with those offsets:
nodeSkew := computeNodeSkew(samples, offsets)
nodeSkew, _ := computeNodeSkew(samples, offsets)
cs, ok := nodeSkew["h1"]
if !ok {
t.Fatal("expected skew data for hash h1")
@@ -220,7 +220,7 @@ func TestComputeNodeSkew_ThreeObservers(t *testing.T) {
t.Errorf("obs3 offset = %v, want 30", offsets["obs3"])
}
nodeSkew := computeNodeSkew(samples, offsets)
nodeSkew, _ := computeNodeSkew(samples, offsets)
cs := nodeSkew["h1"]
if cs == nil {
t.Fatal("expected skew data for h1")
@@ -954,3 +954,104 @@ func TestAllGood_OK_845(t *testing.T) {
t.Errorf("recentBadSampleCount = %v, want 0", r.RecentBadSampleCount)
}
}
func TestNodeClockSkew_EvidencePayload(t *testing.T) {
// 3-observer scenario: obs1 ahead by +2s, obs2 on time, obs3 behind by -1s.
// Node clock is 60s ahead. Raw skew = advertTS - obsTS.
// Hash has 3 observations, each observer sees same advert.
ps := NewPacketStore(nil, nil)
pt := 4 // ADVERT
// Advert timestamp: 1700000060 (node 60s ahead of true time 1700000000)
// obs1 sees at 1700000002 (2s ahead of true time) → raw = 60 - 2 = 58
// obs2 sees at 1700000000 (on time) → raw = 60 - 0 = 60
// obs3 sees at 1699999999 (-1s, behind) → raw = 60 + 1 = 61
// Median obsTS = 1700000000, so:
// obs1 offset = 1700000002 - 1700000000 = +2
// obs2 offset = 0
// obs3 offset = 1699999999 - 1700000000 = -1
// Corrected: raw + offset → obs1: 58+2=60, obs2: 60+0=60, obs3: 61+(-1)=60
tx1 := &StoreTx{
Hash: "evidence_hash_1",
PayloadType: &pt,
DecodedJSON: `{"payload":{"timestamp":1700000060}}`,
Observations: []*StoreObs{
{ObserverID: "obs1", ObserverName: "Observer Alpha", Timestamp: "2023-11-14T22:13:22Z"},
{ObserverID: "obs2", ObserverName: "Observer Beta", Timestamp: "2023-11-14T22:13:20Z"},
{ObserverID: "obs3", ObserverName: "Observer Gamma", Timestamp: "2023-11-14T22:13:19Z"},
},
}
// Second hash to ensure we get multiple evidence entries.
tx2 := &StoreTx{
Hash: "evidence_hash_2",
PayloadType: &pt,
DecodedJSON: `{"payload":{"timestamp":1700003660}}`,
Observations: []*StoreObs{
{ObserverID: "obs1", ObserverName: "Observer Alpha", Timestamp: "2023-11-14T23:13:22Z"},
{ObserverID: "obs2", ObserverName: "Observer Beta", Timestamp: "2023-11-14T23:13:20Z"},
{ObserverID: "obs3", ObserverName: "Observer Gamma", Timestamp: "2023-11-14T23:13:19Z"},
},
}
ps.mu.Lock()
ps.byNode["NODETEST"] = []*StoreTx{tx1, tx2}
ps.byPayloadType[4] = []*StoreTx{tx1, tx2}
ps.clockSkew.computeInterval = 0
ps.mu.Unlock()
r := ps.GetNodeClockSkew("NODETEST")
if r == nil {
t.Fatal("expected clock skew result")
}
// Check recentHashEvidence exists.
if len(r.RecentHashEvidence) == 0 {
t.Fatal("expected recentHashEvidence to be populated")
}
if len(r.RecentHashEvidence) != 2 {
t.Errorf("recentHashEvidence length = %d, want 2", len(r.RecentHashEvidence))
}
// Check first evidence entry has 3 observers.
ev := r.RecentHashEvidence[0]
if len(ev.Observers) != 3 {
t.Fatalf("evidence observers = %d, want 3", len(ev.Observers))
}
// Verify corrected = raw + offset for each observer.
for _, o := range ev.Observers {
expected := o.RawSkewSec + o.ObserverOffsetSec
if math.Abs(o.CorrectedSkewSec-expected) > 0.2 {
t.Errorf("observer %s: corrected=%.1f, expected raw(%.1f)+offset(%.1f)=%.1f",
o.ObserverID, o.CorrectedSkewSec, o.RawSkewSec, o.ObserverOffsetSec, expected)
}
}
// All corrected values should be ~60s (node is 60s ahead).
if math.Abs(ev.MedianCorrectedSkewSec-60) > 1 {
t.Errorf("median corrected = %.1f, want ~60", ev.MedianCorrectedSkewSec)
}
// Check calibration summary.
if r.CalibrationSummary == nil {
t.Fatal("expected calibrationSummary")
}
if r.CalibrationSummary.TotalSamples != 6 { // 3 observers × 2 hashes
t.Errorf("calibration total = %d, want 6", r.CalibrationSummary.TotalSamples)
}
if r.CalibrationSummary.CalibratedSamples != 6 {
t.Errorf("calibrated = %d, want 6 (all multi-observer)", r.CalibrationSummary.CalibratedSamples)
}
// Check observer names are populated.
nameFound := false
for _, o := range ev.Observers {
if o.ObserverName == "Observer Alpha" || o.ObserverName == "Observer Beta" {
nameFound = true
}
}
if !nameFound {
t.Error("expected observer names to be populated from tx observations")
}
}
+35
View File
@@ -72,6 +72,15 @@ type Config struct {
DebugAffinity bool `json:"debugAffinity,omitempty"`
// ObserverBlacklist is a list of observer public keys to exclude from API
// responses (defense in depth — ingestor drops at ingest, server filters
// any that slipped through from a prior unblocked window).
ObserverBlacklist []string `json:"observerBlacklist,omitempty"`
// obsBlacklistSetCached is the lazily-built set version of ObserverBlacklist.
obsBlacklistSetCached map[string]bool
obsBlacklistOnce sync.Once
ResolvedPath *ResolvedPathConfig `json:"resolvedPath,omitempty"`
NeighborGraph *NeighborGraphConfig `json:"neighborGraph,omitempty"`
}
@@ -404,3 +413,29 @@ func (c *Config) IsBlacklisted(pubkey string) bool {
}
return c.blacklistSet()[strings.ToLower(strings.TrimSpace(pubkey))]
}
// obsBlacklistSet lazily builds and caches the observerBlacklist as a set for O(1) lookups.
func (c *Config) obsBlacklistSet() map[string]bool {
c.obsBlacklistOnce.Do(func() {
if len(c.ObserverBlacklist) == 0 {
return
}
m := make(map[string]bool, len(c.ObserverBlacklist))
for _, pk := range c.ObserverBlacklist {
trimmed := strings.ToLower(strings.TrimSpace(pk))
if trimmed != "" {
m[trimmed] = true
}
}
c.obsBlacklistSetCached = m
})
return c.obsBlacklistSetCached
}
// IsObserverBlacklisted returns true if the given observer ID is in the observerBlacklist.
func (c *Config) IsObserverBlacklisted(id string) bool {
if c == nil || len(c.ObserverBlacklist) == 0 {
return false
}
return c.obsBlacklistSet()[strings.ToLower(strings.TrimSpace(id))]
}
+2 -1
View File
@@ -35,7 +35,8 @@ func setupTestDBv2(t *testing.T) *DB {
CREATE TABLE observers (
id TEXT PRIMARY KEY, name TEXT, iata TEXT, last_seen TEXT, first_seen TEXT,
packet_count INTEGER DEFAULT 0, model TEXT, firmware TEXT,
client_version TEXT, radio TEXT, battery_mv INTEGER, uptime_secs INTEGER, noise_floor REAL
client_version TEXT, radio TEXT, battery_mv INTEGER, uptime_secs INTEGER, noise_floor REAL,
inactive INTEGER DEFAULT 0
);
CREATE TABLE transmissions (
id INTEGER PRIMARY KEY AUTOINCREMENT, raw_hex TEXT NOT NULL,
+3 -3
View File
@@ -231,7 +231,7 @@ func (db *DB) GetStats() (*Stats, error) {
sevenDaysAgo := time.Now().Add(-7 * 24 * time.Hour).Format(time.RFC3339)
db.conn.QueryRow("SELECT COUNT(*) FROM nodes WHERE last_seen > ?", sevenDaysAgo).Scan(&s.TotalNodes)
db.conn.QueryRow("SELECT COUNT(*) FROM nodes").Scan(&s.TotalNodesAllTime)
db.conn.QueryRow("SELECT COUNT(*) FROM observers").Scan(&s.TotalObservers)
db.conn.QueryRow("SELECT COUNT(*) FROM observers WHERE inactive IS NULL OR inactive = 0").Scan(&s.TotalObservers)
oneHourAgo := time.Now().Add(-1 * time.Hour).Unix()
db.conn.QueryRow("SELECT COUNT(*) FROM observations WHERE timestamp > ?", oneHourAgo).Scan(&s.PacketsLastHour)
@@ -970,9 +970,9 @@ func (db *DB) getObservationsForTransmissions(txIDs []int) map[int][]map[string]
return result
}
// GetObservers returns all observers sorted by last_seen DESC.
// GetObservers returns active observers (not soft-deleted) sorted by last_seen DESC.
func (db *DB) GetObservers() ([]Observer, error) {
rows, err := db.conn.Query("SELECT id, name, iata, last_seen, first_seen, packet_count, model, firmware, client_version, radio, battery_mv, uptime_secs, noise_floor FROM observers ORDER BY last_seen DESC")
rows, err := db.conn.Query("SELECT id, name, iata, last_seen, first_seen, packet_count, model, firmware, client_version, radio, battery_mv, uptime_secs, noise_floor FROM observers WHERE inactive IS NULL OR inactive = 0 ORDER BY last_seen DESC")
if err != nil {
return nil, err
}
+27 -1
View File
@@ -48,7 +48,8 @@ func setupTestDB(t *testing.T) *DB {
radio TEXT,
battery_mv INTEGER,
uptime_secs INTEGER,
noise_floor REAL
noise_floor REAL,
inactive INTEGER DEFAULT 0
);
CREATE TABLE transmissions (
@@ -357,6 +358,31 @@ func TestGetObservers(t *testing.T) {
}
}
// Regression: GetObservers must exclude soft-deleted (inactive=1) rows.
// Stale observers were appearing in /api/observers despite the auto-prune
// marking them inactive, because the SELECT query had no WHERE filter.
func TestGetObservers_ExcludesInactive(t *testing.T) {
db := setupTestDB(t)
defer db.Close()
seedTestData(t, db)
// Mark obs2 inactive — soft delete simulating a stale-observer prune.
if _, err := db.conn.Exec(`UPDATE observers SET inactive = 1 WHERE id = ?`, "obs2"); err != nil {
t.Fatalf("update inactive: %v", err)
}
observers, err := db.GetObservers()
if err != nil {
t.Fatal(err)
}
if len(observers) != 1 {
t.Errorf("expected 1 observer (obs1) after marking obs2 inactive, got %d", len(observers))
}
for _, o := range observers {
if o.ID == "obs2" {
t.Errorf("inactive observer obs2 should be excluded")
}
}
}
func TestGetObserverByID(t *testing.T) {
db := setupTestDB(t)
defer db.Close()
+43
View File
@@ -0,0 +1,43 @@
package main
import (
"encoding/json"
"net/http"
"sync/atomic"
)
// readiness tracks whether background init goroutines have completed.
// Set to 1 once store.Load, pickBestObservation, and neighbor graph build are done.
var readiness atomic.Int32
// handleHealthz returns 200 when the server is ready to serve queries,
// or 503 while background initialization is still running.
func (s *Server) handleHealthz(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
if readiness.Load() == 0 {
w.WriteHeader(http.StatusServiceUnavailable)
json.NewEncoder(w).Encode(map[string]interface{}{
"ready": false,
"reason": "loading",
})
return
}
var loadedTx, loadedObs int
if s.store != nil {
s.store.mu.RLock()
loadedTx = len(s.store.packets)
for _, p := range s.store.packets {
loadedObs += len(p.Observations)
}
s.store.mu.RUnlock()
}
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]interface{}{
"ready": true,
"loadedTx": loadedTx,
"loadedObs": loadedObs,
})
}
+80
View File
@@ -0,0 +1,80 @@
package main
import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
)
func TestHealthzNotReady(t *testing.T) {
// Ensure readiness is 0 (not ready)
readiness.Store(0)
defer readiness.Store(0)
srv := &Server{store: &PacketStore{}}
req := httptest.NewRequest("GET", "/api/healthz", nil)
w := httptest.NewRecorder()
srv.handleHealthz(w, req)
if w.Code != http.StatusServiceUnavailable {
t.Fatalf("expected 503, got %d", w.Code)
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("invalid JSON: %v", err)
}
if resp["ready"] != false {
t.Fatalf("expected ready=false, got %v", resp["ready"])
}
if resp["reason"] != "loading" {
t.Fatalf("expected reason=loading, got %v", resp["reason"])
}
}
func TestHealthzReady(t *testing.T) {
readiness.Store(1)
defer readiness.Store(0)
srv := &Server{store: &PacketStore{}}
req := httptest.NewRequest("GET", "/api/healthz", nil)
w := httptest.NewRecorder()
srv.handleHealthz(w, req)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d", w.Code)
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("invalid JSON: %v", err)
}
if resp["ready"] != true {
t.Fatalf("expected ready=true, got %v", resp["ready"])
}
if _, ok := resp["loadedTx"]; !ok {
t.Fatal("missing loadedTx field")
}
if _, ok := resp["loadedObs"]; !ok {
t.Fatal("missing loadedObs field")
}
}
func TestHealthzAntiTautology(t *testing.T) {
// When readiness is 0, must NOT return 200
readiness.Store(0)
defer readiness.Store(0)
srv := &Server{store: &PacketStore{}}
req := httptest.NewRequest("GET", "/api/healthz", nil)
w := httptest.NewRecorder()
srv.handleHealthz(w, req)
if w.Code == http.StatusOK {
t.Fatal("anti-tautology: handler returned 200 when readiness=0; gating is broken")
}
}
+26
View File
@@ -174,6 +174,21 @@ func main() {
database.hasResolvedPath = true // detectSchema ran before column was added; fix the flag
}
// Ensure observers.inactive column exists (PR #954 filters on it; ingestor migration
// adds it but server may run against DBs ingestor never touched, e.g. e2e fixture).
if err := ensureObserverInactiveColumn(dbPath); err != nil {
log.Printf("[store] warning: could not add observers.inactive column: %v", err)
}
// Soft-delete observers that are in the blacklist (mark inactive=1) so
// historical data from a prior unblocked window is hidden too.
if len(cfg.ObserverBlacklist) > 0 {
softDeleteBlacklistedObservers(dbPath, cfg.ObserverBlacklist)
}
// WaitGroup for background init steps that gate /api/healthz readiness.
var initWg sync.WaitGroup
// Load or build neighbor graph
if neighborEdgesTableExists(database.conn) {
store.graph = loadNeighborEdgesFromDB(database.conn)
@@ -181,7 +196,9 @@ func main() {
} else {
log.Printf("[neighbor] no persisted edges found, will build in background...")
store.graph = NewNeighborGraph() // empty graph — gets populated by background goroutine
initWg.Add(1)
go func() {
defer initWg.Done()
defer func() {
if r := recover(); r != nil {
log.Printf("[neighbor] graph build panic recovered: %v", r)
@@ -205,7 +222,9 @@ func main() {
// API serves best-effort data until this completes (~10s for 100K txs).
// Processes in chunks of 5000, releasing the lock between chunks so API
// handlers remain responsive.
initWg.Add(1)
go func() {
defer initWg.Done()
defer func() {
if r := recover(); r != nil {
log.Printf("[store] pickBestObservation panic recovered: %v", r)
@@ -233,6 +252,13 @@ func main() {
log.Printf("[store] initial pickBestObservation complete (%d transmissions)", totalPackets)
}()
// Mark server ready once all background init completes.
go func() {
initWg.Wait()
readiness.Store(1)
log.Printf("[server] readiness: ready=true (background init complete)")
}()
// WebSocket hub
hub := NewHub()
+74
View File
@@ -281,6 +281,80 @@ func ensureResolvedPathColumn(dbPath string) error {
return nil
}
// ensureObserverInactiveColumn adds the inactive column to observers if missing.
// The column was originally added by ingestor migration (cmd/ingestor/db.go:344) to
// support soft-delete via RemoveStaleObservers + filtered reads (PR #954). When the
// server starts against a DB that was never touched by the ingestor (e.g. the e2e
// fixture), the column is missing and read queries that filter on it (GetObservers,
// GetStats) silently fail with "no such column: inactive" — leaving /api/observers
// returning empty.
func ensureObserverInactiveColumn(dbPath string) error {
rw, err := openRW(dbPath)
if err != nil {
return err
}
defer rw.Close()
rows, err := rw.Query("PRAGMA table_info(observers)")
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var cid int
var colName string
var colType sql.NullString
var notNull, pk int
var dflt sql.NullString
if rows.Scan(&cid, &colName, &colType, &notNull, &dflt, &pk) == nil && colName == "inactive" {
return nil // already exists
}
}
_, err = rw.Exec("ALTER TABLE observers ADD COLUMN inactive INTEGER DEFAULT 0")
if err != nil {
return fmt.Errorf("add inactive column: %w", err)
}
log.Println("[store] Added inactive column to observers")
return nil
}
// softDeleteBlacklistedObservers marks observers matching the blacklist as
// inactive=1 so they are hidden from API responses. Runs once at startup.
func softDeleteBlacklistedObservers(dbPath string, blacklist []string) {
rw, err := openRW(dbPath)
if err != nil {
log.Printf("[observer-blacklist] warning: could not open DB for soft-delete: %v", err)
return
}
defer rw.Close()
placeholders := make([]string, 0, len(blacklist))
args := make([]interface{}, 0, len(blacklist))
for _, pk := range blacklist {
trimmed := strings.TrimSpace(pk)
if trimmed == "" {
continue
}
placeholders = append(placeholders, "LOWER(?)")
args = append(args, trimmed)
}
if len(placeholders) == 0 {
return
}
query := "UPDATE observers SET inactive = 1 WHERE LOWER(id) IN (" + strings.Join(placeholders, ",") + ") AND (inactive IS NULL OR inactive = 0)"
result, err := rw.Exec(query, args...)
if err != nil {
log.Printf("[observer-blacklist] warning: soft-delete failed: %v", err)
return
}
if n, _ := result.RowsAffected(); n > 0 {
log.Printf("[observer-blacklist] soft-deleted %d blacklisted observer(s)", n)
}
}
// resolvePathForObs resolves hop prefixes to full pubkeys for an observation.
// Returns nil if path is empty.
func resolvePathForObs(pathJSON, observerID string, tx *StoreTx, pm *prefixMap, graph *NeighborGraph) []*string {
+159
View File
@@ -0,0 +1,159 @@
package main
import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
)
func TestConfigIsObserverBlacklisted(t *testing.T) {
cfg := &Config{
ObserverBlacklist: []string{"OBS1", "obs2", " Obs3 "},
}
tests := []struct {
id string
want bool
}{
{"OBS1", true},
{"obs1", true}, // case-insensitive
{"OBS2", true},
{"Obs3", true}, // whitespace trimmed
{"obs4", false},
{"", false},
}
for _, tt := range tests {
got := cfg.IsObserverBlacklisted(tt.id)
if got != tt.want {
t.Errorf("IsObserverBlacklisted(%q) = %v, want %v", tt.id, got, tt.want)
}
}
}
func TestConfigIsObserverBlacklistedEmpty(t *testing.T) {
cfg := &Config{}
if cfg.IsObserverBlacklisted("anything") {
t.Error("empty blacklist should not match anything")
}
}
func TestConfigIsObserverBlacklistedNil(t *testing.T) {
var cfg *Config
if cfg.IsObserverBlacklisted("anything") {
t.Error("nil config should not match anything")
}
}
func TestObserverBlacklistFiltersHandleObservers(t *testing.T) {
db := setupTestDB(t)
db.conn.Exec("INSERT OR IGNORE INTO observers (id, name, iata, last_seen) VALUES ('goodobs', 'GoodObs', 'SFO', datetime('now'))")
db.conn.Exec("INSERT OR IGNORE INTO observers (id, name, iata, last_seen) VALUES ('badobs', 'BadObs', 'LAX', datetime('now'))")
cfg := &Config{
ObserverBlacklist: []string{"badobs"},
}
srv := NewServer(db, cfg, NewHub())
srv.RegisterRoutes(setupTestRouter(srv))
req := httptest.NewRequest("GET", "/api/observers", nil)
w := httptest.NewRecorder()
srv.router.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp ObserverListResponse
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to parse response: %v", err)
}
for _, obs := range resp.Observers {
if obs.ID == "badobs" {
t.Error("blacklisted observer should not appear in observers list")
}
}
foundGood := false
for _, obs := range resp.Observers {
if obs.ID == "goodobs" {
foundGood = true
}
}
if !foundGood {
t.Error("non-blacklisted observer should appear in observers list")
}
}
func TestObserverBlacklistFiltersObserverDetail(t *testing.T) {
db := setupTestDB(t)
db.conn.Exec("INSERT OR IGNORE INTO observers (id, name, iata, last_seen) VALUES ('badobs', 'BadObs', 'LAX', datetime('now'))")
cfg := &Config{
ObserverBlacklist: []string{"badobs"},
}
srv := NewServer(db, cfg, NewHub())
srv.RegisterRoutes(setupTestRouter(srv))
req := httptest.NewRequest("GET", "/api/observers/badobs", nil)
w := httptest.NewRecorder()
srv.router.ServeHTTP(w, req)
if w.Code != http.StatusNotFound {
t.Errorf("expected 404 for blacklisted observer detail, got %d", w.Code)
}
}
func TestNoObserverBlacklistPassesAll(t *testing.T) {
db := setupTestDB(t)
db.conn.Exec("INSERT OR IGNORE INTO observers (id, name, iata, last_seen) VALUES ('someobs', 'SomeObs', 'SFO', datetime('now'))")
cfg := &Config{}
srv := NewServer(db, cfg, NewHub())
srv.RegisterRoutes(setupTestRouter(srv))
req := httptest.NewRequest("GET", "/api/observers", nil)
w := httptest.NewRecorder()
srv.router.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d", w.Code)
}
var resp ObserverListResponse
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to parse response: %v", err)
}
foundSome := false
for _, obs := range resp.Observers {
if obs.ID == "someobs" {
foundSome = true
}
}
if !foundSome {
t.Error("without blacklist, observer should appear")
}
}
func TestObserverBlacklistConcurrent(t *testing.T) {
cfg := &Config{
ObserverBlacklist: []string{"AA", "BB", "CC"},
}
done := make(chan struct{})
for i := 0; i < 50; i++ {
go func() {
defer func() { done <- struct{}{} }()
for j := 0; j < 100; j++ {
cfg.IsObserverBlacklisted("AA")
cfg.IsObserverBlacklisted("DD")
}
}()
}
for i := 0; i < 50; i++ {
<-done
}
}
+78
View File
@@ -0,0 +1,78 @@
package main
import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/gorilla/mux"
)
// TestHandleNodePaths_PrefixCollisionExclusion verifies that paths through a node
// sharing a 2-char prefix with another node are not returned as false positives
// when they have no resolved_path data (issue #929).
//
// Setup:
// - nodeA (target): pubkey starts with "7a", no GPS
// - nodeB (other): pubkey starts with "7a", has GPS → "7a" resolves to nodeB
// - tx1: path ["7a"], resolved_path NULL → false positive candidate, must be excluded
// - tx2: path ["7a"], resolved_path contains nodeA pubkey → SQL-confirmed, must be included
func TestHandleNodePaths_PrefixCollisionExclusion(t *testing.T) {
db := setupTestDB(t)
recent := time.Now().Add(-1 * time.Hour).Format(time.RFC3339)
recentEpoch := time.Now().Add(-1 * time.Hour).Unix()
nodeAPK := "7acb1111aaaabbbb"
nodeBPK := "7aff2222ccccdddd" // same "7a" prefix, has GPS so resolveHop("7a") picks B
db.conn.Exec(`INSERT INTO nodes (public_key, name, role, lat, lon, last_seen, first_seen, advert_count)
VALUES (?, 'NodeA', 'repeater', 0, 0, ?, '2026-01-01', 1)`, nodeAPK, recent)
db.conn.Exec(`INSERT INTO nodes (public_key, name, role, lat, lon, last_seen, first_seen, advert_count)
VALUES (?, 'NodeB', 'repeater', 37.5, -122.0, ?, '2026-01-01', 1)`, nodeBPK, recent)
// tx1: no resolved_path — should be excluded by hop-level check
db.conn.Exec(`INSERT INTO transmissions (id, raw_hex, hash, first_seen) VALUES (10, 'AA', 'hash_fp', ?)`, recent)
db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, path_json, timestamp, resolved_path)
VALUES (10, NULL, '["7a"]', ?, NULL)`, recentEpoch)
// tx2: resolved_path confirms nodeA — must be included
db.conn.Exec(`INSERT INTO transmissions (id, raw_hex, hash, first_seen) VALUES (11, 'BB', 'hash_tp', ?)`, recent)
db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, path_json, timestamp, resolved_path)
VALUES (11, NULL, '["7a"]', ?, ?)`, recentEpoch, `["`+nodeAPK+`"]`)
cfg := &Config{Port: 3000}
hub := NewHub()
srv := NewServer(db, cfg, hub)
store := NewPacketStore(db, nil)
if err := store.Load(); err != nil {
t.Fatalf("store.Load: %v", err)
}
srv.store = store
router := mux.NewRouter()
srv.RegisterRoutes(router)
req := httptest.NewRequest("GET", "/api/nodes/"+nodeAPK+"/paths", nil)
w := httptest.NewRecorder()
router.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp NodePathsResponse
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("unmarshal: %v", err)
}
// Only the SQL-confirmed path (tx2) should be present; tx1 (false positive) must be excluded.
// tx1 and tx2 share the same raw path ["7a"] so they collapse into 1 unique path group.
// If tx1 were included, TotalTransmissions would be 2.
if resp.TotalPaths != 1 {
t.Errorf("expected 1 path group, got %d", resp.TotalPaths)
}
if resp.TotalTransmissions != 1 {
t.Errorf("expected 1 transmission (false positive tx1 excluded), got %d", resp.TotalTransmissions)
}
}
+43 -6
View File
@@ -118,6 +118,9 @@ func (s *Server) RegisterRoutes(r *mux.Router) {
r.HandleFunc("/api/config/map", s.handleConfigMap).Methods("GET")
r.HandleFunc("/api/config/geo-filter", s.handleConfigGeoFilter).Methods("GET")
// Readiness endpoint (gated on background init completion)
r.HandleFunc("/api/healthz", s.handleHealthz).Methods("GET")
// System endpoints
r.HandleFunc("/api/health", s.handleHealth).Methods("GET")
r.HandleFunc("/api/stats", s.handleStats).Methods("GET")
@@ -1258,25 +1261,31 @@ func (s *Server) handleNodePaths(w http.ResponseWriter, r *http.Request) {
_, pm := s.store.getCachedNodesAndPM()
// Collect candidate transmissions from the index, deduplicating by tx ID.
// confirmedByFullKey tracks TXs found via the full-pubkey index key — these are
// already resolved_path-confirmed and bypass the hop-level check below.
confirmedByFullKey := make(map[int]bool)
seen := make(map[int]bool)
var candidates []*StoreTx
addCandidates := func(key string) {
addCandidates := func(key string, confirmed bool) {
for _, tx := range s.store.byPathHop[key] {
if !seen[tx.ID] {
seen[tx.ID] = true
if confirmed {
confirmedByFullKey[tx.ID] = true
}
candidates = append(candidates, tx)
}
}
}
addCandidates(lowerPK) // full pubkey match (from resolved_path)
addCandidates(prefix1) // 2-char raw hop match
addCandidates(prefix2) // 4-char raw hop match
addCandidates(lowerPK, true) // full pubkey match (from resolved_path) → confirmed
addCandidates(prefix1, false) // 2-char raw hop match
addCandidates(prefix2, false) // 4-char raw hop match
// Also check any raw hops that start with prefix2 (longer prefixes).
// Raw hops are typically 2 chars, so iterate only keys with HasPrefix
// on the small set of index keys rather than all packets.
for key := range s.store.byPathHop {
if len(key) > 4 && len(key) < len(lowerPK) && strings.HasPrefix(key, prefix2) {
addCandidates(key)
addCandidates(key, false)
}
}
@@ -1313,6 +1322,7 @@ func (s *Server) handleNodePaths(w http.ResponseWriter, r *http.Request) {
s.store.mu.RUnlock()
// Now run SQL checks outside the lock for candidates that need confirmation.
confirmedBySQL := make(map[int]bool)
filtered := candidates[:0]
for _, cc := range checks {
if cc.inIndex {
@@ -1320,6 +1330,7 @@ func (s *Server) handleNodePaths(w http.ResponseWriter, r *http.Request) {
} else if cc.hasReverse {
if s.store.confirmResolvedPathContains(cc.tx.ID, lowerPK) {
filtered = append(filtered, cc.tx)
confirmedBySQL[cc.tx.ID] = true
}
}
// else: not in index → exclude
@@ -1347,10 +1358,14 @@ func (s *Server) handleNodePaths(w http.ResponseWriter, r *http.Request) {
return r
}
for _, tx := range candidates {
totalTransmissions++
hops := txGetParsedPath(tx)
resolvedHops := make([]PathHopResp, len(hops))
sigParts := make([]string, len(hops))
// For candidates not confirmed via full-pubkey index or SQL, verify that at
// least one hop actually resolves to the target. This catches prefix collisions
// (e.g. two nodes sharing a "7a" 1-byte prefix) that slipped through the
// conservative resolved_path fallback.
containsTarget := confirmedByFullKey[tx.ID] || confirmedBySQL[tx.ID]
for i, hop := range hops {
resolved := resolveHop(hop)
entry := PathHopResp{Prefix: hop, Name: hop}
@@ -1362,11 +1377,22 @@ func (s *Server) handleNodePaths(w http.ResponseWriter, r *http.Request) {
entry.Lon = resolved.Lon
}
sigParts[i] = resolved.PublicKey
if strings.ToLower(resolved.PublicKey) == lowerPK {
containsTarget = true
}
} else {
sigParts[i] = hop
// Unresolvable hop: keep conservative if prefix could be the target.
if strings.HasPrefix(lowerPK, strings.ToLower(hop)) {
containsTarget = true
}
}
resolvedHops[i] = entry
}
if !containsTarget {
continue
}
totalTransmissions++
sig := strings.Join(sigParts, "→")
agg := pathGroups[sig]
@@ -1929,6 +1955,10 @@ func (s *Server) handleObservers(w http.ResponseWriter, r *http.Request) {
result := make([]ObserverResp, 0, len(observers))
for _, o := range observers {
// Defense in depth: skip observers that are in the blacklist
if s.cfg != nil && s.cfg.IsObserverBlacklisted(o.ID) {
continue
}
plh := 0
if c, ok := pktCounts[o.ID]; ok {
plh = c
@@ -1960,6 +1990,13 @@ func (s *Server) handleObservers(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleObserverDetail(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
// Defense in depth: reject blacklisted observer
if s.cfg != nil && s.cfg.IsObserverBlacklisted(id) {
writeError(w, 404, "Observer not found")
return
}
obs, err := s.db.GetObserverByID(id)
if err != nil || obs == nil {
writeError(w, 404, "Observer not found")
+5 -4
View File
@@ -4,7 +4,7 @@
// --- Route/Payload name maps ---
const ROUTE_TYPES = { 0: 'TRANSPORT_FLOOD', 1: 'FLOOD', 2: 'DIRECT', 3: 'TRANSPORT_DIRECT' };
const PAYLOAD_TYPES = { 0: 'Request', 1: 'Response', 2: 'Direct Msg', 3: 'ACK', 4: 'Advert', 5: 'Channel Msg', 6: 'Group Data', 7: 'Anon Req', 8: 'Path', 9: 'Trace', 10: 'Multipart', 11: 'Control', 15: 'Raw Custom' };
const PAYLOAD_COLORS = { 0: 'req', 1: 'response', 2: 'txt-msg', 3: 'ack', 4: 'advert', 5: 'grp-txt', 7: 'anon-req', 8: 'path', 9: 'trace' };
const PAYLOAD_COLORS = { 0: 'req', 1: 'response', 2: 'txt-msg', 3: 'ack', 4: 'advert', 5: 'grp-txt', 6: 'grp-data', 7: 'anon-req', 8: 'path', 9: 'trace' };
function routeTypeName(n) { return ROUTE_TYPES[n] || 'UNKNOWN'; }
function payloadTypeName(n) { return PAYLOAD_TYPES[n] || 'UNKNOWN'; }
@@ -965,10 +965,11 @@ window.addEventListener('DOMContentLoaded', () => {
}).catch(() => {
window.SITE_CONFIG = { timestamps: { defaultMode: 'ago', timezone: 'local', formatPreset: 'iso', customFormat: '', allowCustomFormat: false } };
if (window._customizerV2) window._customizerV2.init(window.SITE_CONFIG);
}).finally(() => {
if (!location.hash || location.hash === '#/') location.hash = '#/home';
else navigate();
});
// Navigate immediately — don't gate data-fetching pages on cosmetic theme fetch
if (!location.hash || location.hash === '#/') location.hash = '#/home';
else navigate();
});
/**
+5
View File
@@ -595,6 +595,11 @@
// Only fitBounds on subsequent data refreshes if user hasn't manually panned
} catch (e) {
console.error('Map load error:', e);
} finally {
// Always signal data-loaded — even on error — so E2E tests can proceed.
// Otherwise an api() failure leaves the test waiting forever.
var mapContainer = document.getElementById('leaflet-map');
if (mapContainer) mapContainer.setAttribute('data-loaded', 'true');
}
}
+41 -1
View File
@@ -815,6 +815,41 @@
* Shared between the full-screen detail page and the side panel (#813, #690).
* No-op if the container is missing, the API errors, or the response lacks severity.
*/
/** Build collapsible evidence panel for node clock skew card */
function buildEvidencePanel(cs) {
var evidence = cs.recentHashEvidence;
if (!evidence || evidence.length === 0) return '';
var calSum = cs.calibrationSummary || {};
var calLine = calSum.totalSamples
? '<div style="font-size:11px;color:var(--text-muted);margin-bottom:6px">Last ' + calSum.totalSamples + ' samples: ' + (calSum.calibratedSamples || 0) + ' corrected via observer calibration, ' + (calSum.uncalibratedSamples || 0) + ' uncorrected (single-observer).</div>'
: '';
// Severity reason.
var skewVal = window.currentSkewValue(cs);
var sampleCount = (cs.samples || []).length;
var sevLabel = SKEW_SEVERITY_LABELS[cs.severity] || cs.severity;
var reasonLine = '<div style="font-size:12px;margin-bottom:8px"><strong>Recent ' + sampleCount + ' adverts median ' + formatSkew(skewVal) + ' → ' + sevLabel + '</strong></div>';
var hashBlocks = evidence.map(function(ev) {
var shortHash = (ev.hash || '').substring(0, 8) + '…';
var obsCount = ev.observers ? ev.observers.length : 0;
var header = '<div style="font-weight:600;font-size:12px;margin-top:6px">Hash ' + shortHash + ' · ' + obsCount + ' observer' + (obsCount !== 1 ? 's' : '') + ' · median corrected: ' + formatSkew(ev.medianCorrectedSkewSec) + '</div>';
var lines = (ev.observers || []).map(function(o) {
var name = o.observerName || o.observerID;
return '<div style="font-size:11px;padding-left:16px;font-family:var(--mono)">' +
name + ' raw=' + formatSkew(o.rawSkewSec) + ' corrected=' + formatSkew(o.correctedSkewSec) + ' (observer offset ' + formatSkew(o.observerOffsetSec) + ')' +
'</div>';
}).join('');
return header + lines;
}).join('');
return '<details style="margin-top:10px"><summary style="cursor:pointer;font-size:12px;color:var(--text-muted)">Evidence (' + evidence.length + ' hashes)</summary>' +
'<div style="margin-top:6px;padding:8px;background:var(--bg-secondary);border-radius:6px">' +
reasonLine + calLine + hashBlocks +
'</div></details>';
}
async function loadClockSkewInto(container, pubkey) {
if (!container) return;
try {
@@ -841,7 +876,8 @@
'</div>' +
driftHtml +
(sparkHtml ? '<div class="skew-sparkline-wrap" style="margin-top:8px">' + sparkHtml + '<div style="font-size:10px;color:var(--text-muted)">Skew over time (' + (cs.samples || []).length + ' samples)</div></div>' : '') +
bimodalWarning;
bimodalWarning +
buildEvidencePanel(cs);
} catch (e) {
// Non-fatal — section stays hidden
}
@@ -955,6 +991,10 @@
console.error('Failed to load nodes:', e);
const tbody = document.getElementById('nodesBody');
if (tbody) tbody.innerHTML = '<tr><td colspan="6" class="text-center" style="padding:24px;color:var(--error,#ef4444)"><div role="alert" aria-live="polite">Failed to load nodes. Please try again.</div></td></tr>';
} finally {
// Always signal data-loaded — even on error — so E2E tests can proceed.
var nodesContainer = document.getElementById('nodesLeft') || document.getElementById('nodesBody');
if (nodesContainer) nodesContainer.setAttribute('data-loaded', 'true');
}
}
+21 -3
View File
@@ -70,18 +70,24 @@
try {
destroyCharts();
chartDefaults();
const [obs, analytics] = await Promise.all([
const [obs, analytics, obsSkewArr] = await Promise.all([
api('/observers/' + encodeURIComponent(currentId)),
api('/observers/' + encodeURIComponent(currentId) + '/analytics?days=' + currentDays),
api('/observers/clock-skew', { ttl: 30000 }).catch(function() { return []; }),
]);
renderDetail(obs, analytics);
// Find this observer's calibration data.
var obsSkew = null;
(Array.isArray(obsSkewArr) ? obsSkewArr : []).forEach(function(s) {
if (s && s.observerID === currentId) obsSkew = s;
});
renderDetail(obs, analytics, obsSkew);
} catch (e) {
document.getElementById('obsDetailContent').innerHTML =
'<div class="text-muted" style="padding:40px">Error: ' + e.message + '</div>';
}
}
function renderDetail(obs, analytics) {
function renderDetail(obs, analytics, obsSkew) {
const el = document.getElementById('obsDetailContent');
if (!el) return;
@@ -154,6 +160,18 @@
<div class="mono" style="font-size:0.75em;color:var(--text-muted);margin-bottom:20px;word-break:break-all">
ID: ${obs.id}
</div>
${obsSkew && obsSkew.samples > 0 ? `
<div class="node-full-card skew-detail-section" style="margin-bottom:20px;padding:12px">
<h4 style="margin:0 0 6px"> Clock Offset</h4>
<div style="display:flex;align-items:center;gap:12px;flex-wrap:wrap">
<span style="font-size:18px;font-weight:700;font-family:var(--mono)">${formatSkew(obsSkew.offsetSec)}</span>
${renderSkewBadge(observerSkewSeverity(obsSkew.offsetSec), obsSkew.offsetSec)}
<span class="text-muted" style="font-size:12px">${obsSkew.samples} sample${obsSkew.samples !== 1 ? 's' : ''}</span>
</div>
<div style="font-size:12px;color:var(--text-muted);margin-top:8px;max-width:600px">
<strong>How this is computed:</strong> when this observer and another observer see the same packet, we compare their receive timestamps. The median deviation across all multi-observer packets is this observer's offset.
</div>
</div>` : ''}
<div class="obs-charts" style="display:grid;grid-template-columns:repeat(auto-fit,minmax(400px,1fr));gap:16px">
<div class="chart-card" style="padding:12px">
<h3 style="margin:0 0 8px;font-size:0.95em">Packets Over Time</h3>
+17 -2
View File
@@ -3,6 +3,7 @@
(function () {
let observers = [];
let obsSkewMap = {}; // observerID → {offsetSec, samples}
let wsHandler = null;
let refreshTimer = null;
let regionChangeHandler = null;
@@ -51,12 +52,20 @@
if (regionChangeHandler) RegionFilter.offChange(regionChangeHandler);
regionChangeHandler = null;
observers = [];
obsSkewMap = {};
}
async function loadObservers() {
try {
const data = await api('/observers', { ttl: CLIENT_TTL.observers });
const [data, skewData] = await Promise.all([
api('/observers', { ttl: CLIENT_TTL.observers }),
api('/observers/clock-skew', { ttl: 30000 }).catch(function() { return []; })
]);
observers = data.observers || [];
obsSkewMap = {};
(Array.isArray(skewData) ? skewData : []).forEach(function(s) {
if (s && s.observerID) obsSkewMap[s.observerID] = s;
});
render();
} catch (e) {
document.getElementById('obsContent').innerHTML =
@@ -124,7 +133,7 @@
<caption class="sr-only">Observer status and statistics</caption>
<thead><tr>
<th scope="col">Status</th><th scope="col">Name</th><th scope="col">Region</th><th scope="col">Last Seen</th>
<th scope="col">Packets</th><th scope="col">Packets/Hour</th><th scope="col">Uptime</th>
<th scope="col">Packets</th><th scope="col">Packets/Hour</th><th scope="col">Clock Offset</th><th scope="col">Uptime</th>
</tr></thead>
<tbody>${filtered.map(o => {
const h = healthStatus(o.last_seen);
@@ -136,6 +145,12 @@
<td>${timeAgo(o.last_seen)}</td>
<td>${(o.packet_count || 0).toLocaleString()}</td>
<td>${sparkBar(o.packetsLastHour || 0, maxPktsHr)}</td>
<td>${(function() {
var sk = obsSkewMap[o.id];
if (!sk || sk.samples == null || sk.samples === 0) return '<span class="text-muted">—</span>';
var sev = observerSkewSeverity(sk.offsetSec);
return renderSkewBadge(sev, sk.offsetSec) + ' <span class="text-muted" title="Computed from ' + sk.samples + ' multi-observer packets. Positive = observer ahead of consensus.">(' + sk.samples + ')</span>';
})()}</td>
<td>${uptimeStr(o.first_seen)}</td>
</tr>`;
}).join('')}</tbody>
+5 -1
View File
@@ -26,7 +26,7 @@
let observers = [];
let observerMap = new Map(); // id → observer for O(1) lookups (#383)
let regionMap = {};
const TYPE_NAMES = { 0:'Request', 1:'Response', 2:'Direct Msg', 3:'ACK', 4:'Advert', 5:'Channel Msg', 7:'Anon Req', 8:'Path', 9:'Trace', 11:'Control' };
const TYPE_NAMES = { 0:'Request', 1:'Response', 2:'Direct Msg', 3:'ACK', 4:'Advert', 5:'Channel Msg', 6:'Group Data', 7:'Anon Req', 8:'Path', 9:'Trace', 11:'Control' };
function typeName(t) { return TYPE_NAMES[t] ?? `Type ${t}`; }
const isMobile = window.innerWidth <= 1024;
const PACKET_LIMIT = isMobile ? 1000 : 50000;
@@ -748,6 +748,10 @@
console.error('Failed to load packets:', e);
const tbody = document.getElementById('pktBody');
if (tbody) tbody.innerHTML = '<tr><td colspan="' + _getColCount() + '" class="text-center" style="padding:24px;color:var(--error,#ef4444)"><div role="alert" aria-live="polite">Failed to load packets. Please try again.</div></td></tr>';
} finally {
// Always signal data-loaded — even on error — so E2E tests can proceed.
var pktContainer = document.getElementById('pktLeft') || document.getElementById('pktBody');
if (pktContainer) pktContainer.setAttribute('data-loaded', 'true');
}
}
+8 -2
View File
@@ -15,14 +15,14 @@
};
window.TYPE_COLORS = {
ADVERT: '#22c55e', GRP_TXT: '#3b82f6', TXT_MSG: '#f59e0b', ACK: '#6b7280',
ADVERT: '#22c55e', GRP_TXT: '#3b82f6', GRP_DATA: '#8b5cf6', TXT_MSG: '#f59e0b', ACK: '#6b7280',
REQUEST: '#a855f7', RESPONSE: '#06b6d4', TRACE: '#ec4899', PATH: '#14b8a6',
ANON_REQ: '#f43f5e', UNKNOWN: '#6b7280'
};
// Badge CSS class name mapping
const TYPE_BADGE_MAP = {
ADVERT: 'advert', GRP_TXT: 'grp-txt', TXT_MSG: 'txt-msg', ACK: 'ack',
ADVERT: 'advert', GRP_TXT: 'grp-txt', GRP_DATA: 'grp-data', TXT_MSG: 'txt-msg', ACK: 'ack',
REQUEST: 'req', RESPONSE: 'response', TRACE: 'trace', PATH: 'path',
ANON_REQ: 'anon-req', UNKNOWN: 'unknown'
};
@@ -455,6 +455,12 @@
return '<span class="' + cls + '" title="Clock skew: ' + window.formatSkew(skewSec) + ' (' + (SKEW_SEVERITY_LABELS[severity] || severity) + ')">' + label + '</span>';
};
/** Compute severity for an observer's clock offset (seconds). */
window.observerSkewSeverity = function(offsetSec) {
var abs = Math.abs(offsetSec);
return abs >= 3600 ? 'critical' : abs >= 300 ? 'warning' : 'ok';
};
/** Render a skew sparkline SVG (inline, word-sized) */
window.renderSkewSparkline = function(samples, w, h) {
w = w || 120; h = h || 24;
+7 -2
View File
@@ -211,6 +211,7 @@ async function run() {
// Test 2: Nodes page loads with data
await test('Nodes page loads with data', async () => {
await page.goto(`${BASE}/#/nodes`, { waitUntil: 'domcontentloaded' });
await page.waitForSelector('[data-loaded="true"]', { timeout: 15000 });
await page.waitForSelector('table tbody tr');
const headers = await page.$$eval('th', els => els.map(e => e.textContent.trim()));
for (const col of ['Name', 'Public Key', 'Role']) {
@@ -236,6 +237,7 @@ async function run() {
// Test: Node side panel Details link navigates to full detail page (#778)
await test('Node side panel Details link navigates', async () => {
await page.goto(`${BASE}/#/nodes`, { waitUntil: 'domcontentloaded' });
await page.waitForSelector('[data-loaded="true"]', { timeout: 15000 });
await page.waitForSelector('table tbody tr');
await page.click('table tbody tr');
await page.waitForSelector('.node-detail');
@@ -257,6 +259,7 @@ async function run() {
// Test: Nodes page has WebSocket auto-update listener (#131)
await test('Nodes page has WebSocket auto-update', async () => {
await page.goto(`${BASE}/#/nodes`, { waitUntil: 'domcontentloaded' });
await page.waitForSelector('[data-loaded="true"]', { timeout: 15000 });
await page.waitForSelector('table tbody tr');
// The live dot in navbar indicates WS connection status
const liveDot = await page.$('#liveDot');
@@ -282,11 +285,12 @@ async function run() {
// Test 3: Map page loads with markers
await test('Map page loads with markers', async () => {
await page.goto(`${BASE}/#/map`, { waitUntil: 'domcontentloaded' });
await page.waitForSelector('[data-loaded="true"]', { timeout: 15000 });
await page.waitForSelector('.leaflet-container');
await page.waitForSelector('.leaflet-tile-loaded');
// Wait for markers/overlays to render (may not exist with empty DB)
try {
await page.waitForSelector('.leaflet-marker-icon, .leaflet-interactive, circle, .marker-cluster, .leaflet-marker-pane > *, .leaflet-overlay-pane svg path, .leaflet-overlay-pane svg circle', { timeout: 3000 });
await page.waitForSelector('.leaflet-marker-icon, .leaflet-interactive, circle, .marker-cluster, .leaflet-marker-pane > *, .leaflet-overlay-pane svg path, .leaflet-overlay-pane svg circle', { timeout: 8000 });
} catch (_) {
// No markers with empty DB \u2014 assertion below handles it
}
@@ -362,7 +366,7 @@ async function run() {
await page.waitForSelector('.leaflet-container');
// Wait for markers (may not exist with empty DB)
try {
await page.waitForSelector('.leaflet-marker-icon, .leaflet-interactive', { timeout: 3000 });
await page.waitForSelector('.leaflet-marker-icon, .leaflet-interactive', { timeout: 8000 });
} catch (_) {
// No markers with empty DB
}
@@ -394,6 +398,7 @@ async function run() {
await page.goto(`${BASE}/#/packets`, { waitUntil: 'domcontentloaded' });
await page.evaluate(() => localStorage.setItem('meshcore-time-window', '525600'));
await page.reload({ waitUntil: 'load' });
await page.waitForSelector('[data-loaded="true"]', { timeout: 15000 });
await page.waitForSelector('table tbody tr', { timeout: 15000 });
const rowsBefore = await page.$$('table tbody tr');
assert(rowsBefore.length > 0, 'No packets visible');
+43
View File
@@ -0,0 +1,43 @@
#!/usr/bin/env bash
# freshen-fixture.sh — Shift all timestamps in the fixture DB to be relative to now.
# Preserves the relative ordering between timestamps.
# Usage: bash tools/freshen-fixture.sh <path-to-fixture.db>
set -euo pipefail
DB="${1:?Usage: freshen-fixture.sh <path-to-fixture.db>}"
if [ ! -f "$DB" ]; then
echo "ERROR: DB file not found: $DB" >&2
exit 1
fi
# Find the max timestamp across all time columns, compute offset, shift everything forward.
sqlite3 "$DB" <<'SQL'
-- Shift all timestamps forward so the newest is ~now, preserving relative ordering.
-- Use strftime to produce RFC3339 format (T separator + Z suffix) for correct comparison.
UPDATE nodes SET last_seen = strftime('%Y-%m-%dT%H:%M:%SZ', last_seen,
(SELECT printf('+%d seconds', CAST((julianday('now') - julianday(MAX(last_seen))) * 86400 AS INTEGER)) FROM nodes)
) WHERE last_seen IS NOT NULL;
UPDATE transmissions SET first_seen = strftime('%Y-%m-%dT%H:%M:%SZ', first_seen,
(SELECT printf('+%d seconds', CAST((julianday('now') - julianday(MAX(first_seen))) * 86400 AS INTEGER)) FROM transmissions)
) WHERE first_seen IS NOT NULL;
-- Observers: shift last_seen too so they don't get auto-pruned by RemoveStaleObservers
-- on server startup (default 14d threshold marks all >14d observers inactive=1, which
-- the /api/observers filter then excludes — leaving the map page with no observer markers).
UPDATE observers SET last_seen = strftime('%Y-%m-%dT%H:%M:%SZ', last_seen,
(SELECT printf('+%d seconds', CAST((julianday('now') - julianday(MAX(last_seen))) * 86400 AS INTEGER)) FROM observers)
) WHERE last_seen IS NOT NULL;
SQL
# Defensive: clear any stale inactive=1 flags. Column may not exist on fresh fixtures
# (added by server migration on first startup); silently no-op if missing.
sqlite3 "$DB" "UPDATE observers SET inactive = 0 WHERE inactive = 1;" 2>/dev/null || true
# neighbor_edges may not exist in all fixture versions
sqlite3 "$DB" "UPDATE neighbor_edges SET last_seen = strftime('%Y-%m-%dT%H:%M:%SZ', last_seen, (SELECT printf('+%d seconds', CAST((julianday('now') - julianday(MAX(last_seen))) * 86400 AS INTEGER)) FROM neighbor_edges)) WHERE last_seen IS NOT NULL;" 2>/dev/null || true
echo "Fixture timestamps freshened in $DB"
sqlite3 "$DB" "SELECT 'nodes: min=' || MIN(last_seen) || ' max=' || MAX(last_seen) FROM nodes;"
sqlite3 "$DB" "SELECT 'observers: count=' || COUNT(*) || ' max=' || MAX(last_seen) FROM observers;"