Compare commits

..

8 Commits

Author SHA1 Message Date
you
331dc0090e test: add load test with throughput and latency metrics
TestLoadTestThroughput: 1000 messages × 4 writes each = 4000 writes,
20 concurrent goroutines. Reports msgs/sec, p50/p95/p99 latency,
SQLITE_BUSY count, and total errors. Hard-asserts zero BUSY errors.
2026-03-28 16:54:06 +00:00
you
cef8156a86 fix: set MaxIdleConns(1) to match MaxOpenConns(1)
Prevents unnecessary connection close/reopen churn from the default
MaxIdleConns(2) when only 1 connection is ever open.
2026-03-28 16:37:56 +00:00
you
9751141ffc feat: add observability metrics and concurrency tests
Observability:
- Add DBStats struct with atomic counters for tx_inserted, tx_dupes,
  obs_inserted, node_upserts, observer_upserts, write_errors
- Log SQLite config on startup (busy_timeout, max_open_conns, journal)
- Periodic stats logging every 5 minutes + final stats on shutdown
- Instrument all write paths with counter increments

Tests:
- TestConcurrentWrites: 20 goroutines × 50 writes (1000 total) with
  interleaved InsertTransmission + UpsertNode + UpsertObserver calls.
  Verifies zero errors and data integrity under concurrent load.
- TestDBStats: verifies counter accuracy for inserts, duplicates,
  upserts, and that LogStats does not panic
2026-03-28 16:36:50 +00:00
you
9c5ffbfb0c fix: resolve SQLite SQLITE_BUSY write contention in ingestor
Three changes to eliminate concurrent write collisions:

1. Add _busy_timeout=5000 to ingestor SQLite DSN (matches server)
   - SQLite will wait up to 5s for the write lock instead of
     immediately returning SQLITE_BUSY

2. Set SetMaxOpenConns(1) on ingestor DB connection pool
   - Serializes all DB access at the Go sql.DB level
   - Prevents multiple goroutines from opening overlapping writes

3. Change SetOrderMatters(false) to SetOrderMatters(true)
   - MQTT handlers now run sequentially per client
   - Eliminates concurrent handler execution that caused
     overlapping multi-statement write flows

Root cause: concurrent MQTT handlers (SetOrderMatters=false) each
performed multiple separate writes (transmission lookup/insert,
observation insert, node upsert, observer upsert) without transactions
or connection limits. SQLite only permits one writer at a time, so
under bursty MQTT traffic the ingestor was competing with itself.
2026-03-28 16:16:07 +00:00
Kpa-clawbot
3361643bc0 fix: #208 search results keyboard accessible — tabindex, role, arrow-key nav
- Search result items: tabindex='0', role='option', data-href (replaces inline onclick)
- Delegated click handler via activateSearchItem()
- Keydown handler: Enter/Space activates, ArrowDown/ArrowUp navigates items
- ArrowDown from search input focuses first result
- searchResults container: role='listbox'
- Bump cache busters

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-03-28 02:43:09 -07:00
Kpa-clawbot
f04f1b8e77 fix: accessibility — chart labels, table scope, form labels (#210, #211, #212)
#210: Add role="img" aria-label to 9 Chart.js canvases in node-analytics.js
and observer-detail.js with descriptive labels.

#211: Add scope="col" to all <th> elements across analytics.js, audio-lab.js,
compare.js, node-analytics.js, nodes.js, observer-detail.js, observers.js,
and packets.js (40+ headers).

#212: Add aria-label to packet filter input and time window select in
packets.js. Add for/id associations to all customize.js inputs: branding,
theme colors, node/type colors, heatmap sliders, onboarding fields, and
export controls.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-03-28 02:42:01 -07:00
Kpa-clawbot
447c5d7073 fix: mobile responsive — #203 live bottom-sheet, #204 perf layout, #205 nodes col-hide
#203: Live page node detail panel becomes a bottom-sheet on mobile
      (width:100%, bottom:0, max-height:60vh, rounded top corners).
#204: Perf page reduces padding to 12px, perf-cards stack in 2-col
      grid, tables get smaller font/padding on mobile.
#205: Nodes table hides Public Key column on mobile via .col-pubkey
      class (same pattern as packets page .col-region/.col-rpt).

Cache busters bumped in index.html.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-03-28 02:38:23 -07:00
Kpa-clawbot
aa2e8ed420 ci: remove Node deploy steps, update badges for Go
- Remove build-node and deploy-node jobs (Node staging on port 81)
- Rename build-go → build and deploy-go → deploy
- Update publish job to depend only on deploy (not deploy-node)
- Update README badges to show Go coverage (server/ingestor) instead of Node backend
- Remove Node staging references from deployment summary
- node-test job remains (frontend tests + Playwright)

Pipeline is now: node-test + go-test → build → deploy → publish

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-03-28 01:59:31 -07:00
19 changed files with 1503 additions and 1326 deletions

View File

@@ -16,14 +16,14 @@ concurrency:
env:
FORCE_JAVASCRIPT_ACTIONS_TO_NODE24: true
# Pipeline (TWO INDEPENDENT TRACKS):
# Track 1 (Node): node-test → build-node → deploy-node ──┐
# Track 2 (Go): go-test → build-go → deploy-go ──┼──→ publish
# └─ (both wait)
# Pipeline:
# node-test (frontend tests) ──┐
# go-test ├──→ build → deploy → publish
# └─ (both wait)
#
# Proto validation flow:
# 1. go-test job: verify .proto files compile (syntax check)
# 2. deploy-node job: capture fresh fixtures from prod, validate protos match actual API responses
# 2. deploy job: capture fresh fixtures from prod, validate protos match actual API responses
jobs:
# ───────────────────────────────────────────────────────────────
@@ -266,35 +266,10 @@ jobs:
if-no-files-found: ignore
# ───────────────────────────────────────────────────────────────
# 3. Build Node Docker Image — Track 1
# 3. Build Docker Image
# ───────────────────────────────────────────────────────────────
build-node:
name: "🏗️ Build Node Docker Image"
needs: [node-test]
runs-on: self-hosted
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Node.js 22
uses: actions/setup-node@v4
with:
node-version: '22'
- name: Validate JavaScript syntax
run: sh scripts/validate.sh
- name: Build Node.js Docker image
run: |
echo "${GITHUB_SHA::7}" > .git-commit
docker build -t meshcore-analyzer:latest .
echo "Built meshcore-analyzer:latest ($(git rev-parse --short HEAD))"
# ───────────────────────────────────────────────────────────────
# 4. Build Go Docker Image — Track 2
# ───────────────────────────────────────────────────────────────
build-go:
name: "🏗️ Build Go Docker Image"
build:
name: "🏗️ Build Docker Image"
needs: [go-test]
runs-on: self-hosted
steps:
@@ -315,267 +290,17 @@ jobs:
echo "Built Go staging image"
# ───────────────────────────────────────────────────────────────
# 5. Deploy Node Staging — start on port 81, healthcheck, smoke test
# 4. Deploy Staging — start on port 82, healthcheck, smoke test
# ───────────────────────────────────────────────────────────────
deploy-node:
name: "🚀 Deploy Node Staging"
needs: [build-node]
deploy:
name: "🚀 Deploy Staging"
needs: [build]
runs-on: self-hosted
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Prepare staging environment
run: |
set -e
# Source environment overrides from deploy dir or home
if [ -f /opt/meshcore-deploy/.env ]; then
set -a; source /opt/meshcore-deploy/.env; set +a
echo "Sourced /opt/meshcore-deploy/.env"
elif [ -f "$HOME/.env" ]; then
set -a; source "$HOME/.env"; set +a
echo "Sourced $HOME/.env"
else
echo "No .env found, using defaults"
fi
# Ensure data directories exist
STAGING_DIR="${STAGING_DATA_DIR:-$HOME/meshcore-staging-data}"
mkdir -p "${PROD_DATA_DIR:-$HOME/meshcore-data}" "$STAGING_DIR"
# Ensure staging has a Caddyfile (generate from template if missing)
if [ ! -f "$STAGING_DIR/Caddyfile" ]; then
cp docker/Caddyfile.staging "$STAGING_DIR/Caddyfile"
echo "Generated staging Caddyfile"
fi
# Ensure staging has a config.json (copy from prod if missing)
if [ ! -f "$STAGING_DIR/config.json" ]; then
PROD_DIR="${PROD_DATA_DIR:-$HOME/meshcore-data}"
if [ -f "$PROD_DIR/config.json" ]; then
cp "$PROD_DIR/config.json" "$STAGING_DIR/config.json"
elif [ -f /opt/meshcore-deploy/config.json ]; then
cp /opt/meshcore-deploy/config.json "$STAGING_DIR/config.json"
else
echo '{}' > "$STAGING_DIR/config.json"
echo "WARNING: No config.json found, created empty one"
fi
fi
# Copy compose file to deploy dir so manage.sh can use it
mkdir -p /opt/meshcore-deploy
cp docker-compose.yml /opt/meshcore-deploy/docker-compose.yml
- name: Start Node staging on port 81
run: |
# Force remove stale containers and volumes
docker rm -f meshcore-staging 2>/dev/null || true
docker volume prune -f 2>/dev/null || true
# Clean up stale ports
fuser -k 81/tcp 2>/dev/null || true
docker compose --profile staging up -d staging
- name: Healthcheck Node staging container
run: |
for i in $(seq 1 300); do
HEALTH=$(docker inspect meshcore-staging --format '{{.State.Health.Status}}' 2>/dev/null || echo "starting")
if [ "$HEALTH" = "healthy" ]; then
echo "Node staging healthy after ${i}s"
break
fi
if [ "$i" -eq 300 ]; then
echo "Node staging failed health check after 300s"
docker logs meshcore-staging --tail 50
exit 1
fi
sleep 1
done
- name: Smoke test Node staging API
run: |
curl -f http://localhost:81/api/stats || exit 1
curl -f http://localhost:81/api/nodes || exit 1
echo "Node staging smoke tests passed ✅"
- name: 🔍 Validate API contract (protos vs prod fixtures)
run: |
set -e
echo "Refreshing Node fixtures from staging container..."
mkdir -p proto/testdata/node-fixtures
# ─── Simple endpoints (no parameters) ──────────────────────────
ENDPOINTS=(
"stats" "health" "perf" "nodes" "packets" "observers" "channels"
"analytics/rf" "analytics/topology" "analytics/channels"
"analytics/hash-sizes" "analytics/distance" "analytics/subpaths"
"config/theme" "config/regions" "config/client" "config/cache" "config/map"
"iata-coords"
)
for endpoint in "${ENDPOINTS[@]}"; do
fixture_name=$(echo "$endpoint" | tr '/' '-')
echo " Fetching $endpoint → ${fixture_name}.json"
docker exec meshcore-prod wget -qO- "http://localhost:3000/api/$endpoint" \
> "proto/testdata/node-fixtures/${fixture_name}.json" 2>/dev/null || {
echo " ⚠ Failed to fetch $endpoint (container may not have data yet)"
}
done
# ─── Dynamic ID endpoints (require real data) ─────────────────
echo ""
echo "Fetching endpoints that require dynamic IDs..."
# Get a real pubkey from nodes
PUBKEY=$(docker exec meshcore-prod wget -qO- "http://localhost:3000/api/nodes?limit=1" 2>/dev/null | python3 -c "import sys,json; d=json.load(sys.stdin); print(d['nodes'][0]['public_key'] if d.get('nodes') and len(d['nodes']) > 0 else '')" 2>/dev/null || echo "")
if [ -n "$PUBKEY" ]; then
echo " Using pubkey: ${PUBKEY:0:16}..."
docker exec meshcore-prod wget -qO- "http://localhost:3000/api/nodes/$PUBKEY" \
> "proto/testdata/node-fixtures/node-detail.json" 2>/dev/null && \
echo " ✓ node-detail.json"
docker exec meshcore-prod wget -qO- "http://localhost:3000/api/nodes/$PUBKEY/health" \
> "proto/testdata/node-fixtures/node-health.json" 2>/dev/null && \
echo " ✓ node-health.json"
docker exec meshcore-prod wget -qO- "http://localhost:3000/api/nodes/$PUBKEY/paths" \
> "proto/testdata/node-fixtures/node-paths.json" 2>/dev/null && \
echo " ✓ node-paths.json"
docker exec meshcore-prod wget -qO- "http://localhost:3000/api/nodes/$PUBKEY/analytics" \
> "proto/testdata/node-fixtures/node-analytics.json" 2>/dev/null && \
echo " ✓ node-analytics.json"
else
echo " ⚠ No nodes available — skipping node detail endpoints"
fi
# Node search
docker exec meshcore-prod wget -qO- "http://localhost:3000/api/nodes/search?q=repeater" \
> "proto/testdata/node-fixtures/node-search.json" 2>/dev/null && \
echo " ✓ node-search.json" || echo " ⚠ node-search failed"
# Bulk health
docker exec meshcore-prod wget -qO- "http://localhost:3000/api/nodes/bulk-health" \
> "proto/testdata/node-fixtures/bulk-health.json" 2>/dev/null && \
echo " ✓ bulk-health.json" || echo " ⚠ bulk-health failed"
# Get a real hash from packets
HASH=$(docker exec meshcore-prod wget -qO- "http://localhost:3000/api/packets?limit=1" 2>/dev/null | python3 -c "import sys,json; d=json.load(sys.stdin); print(d['packets'][0]['hash'] if d.get('packets') and len(d['packets']) > 0 else '')" 2>/dev/null || echo "")
if [ -n "$HASH" ]; then
echo " Using hash: $HASH"
docker exec meshcore-prod wget -qO- "http://localhost:3000/api/packets/$HASH" \
> "proto/testdata/node-fixtures/packet-detail.json" 2>/dev/null && \
echo " ✓ packet-detail.json"
else
echo " ⚠ No packets available — skipping packet-detail"
fi
# ─── Per-type packet fixtures (one of each payload type) ──────
echo ""
echo "Fetching per-type packet fixtures..."
# payload_type: 0=REQ, 1=TXT_MSG, 4=ADVERT, 5=GRP_TXT
TYPES="0:req 1:txtmsg 4:advert 5:grptxt"
for entry in $TYPES; do
TYPE_NUM="${entry%%:*}"
TYPE_NAME="${entry##*:}"
TYPE_HASH=$(docker exec meshcore-prod wget -qO- "http://localhost:3000/api/packets?type=${TYPE_NUM}&limit=1" 2>/dev/null | python3 -c "import sys,json; d=json.load(sys.stdin); pkts=d.get('packets',[]); print(pkts[0]['hash'] if pkts else '')" 2>/dev/null || echo "")
if [ -n "$TYPE_HASH" ]; then
if [ "$TYPE_NAME" = "grptxt" ]; then
# Save first as decrypted (most common), then find an undecrypted one
docker exec meshcore-prod wget -qO- "http://localhost:3000/api/packets/$TYPE_HASH" \
> "proto/testdata/node-fixtures/packet-type-grptxt-decrypted.json" 2>/dev/null && \
echo " ✓ packet-type-grptxt-decrypted.json (hash: $TYPE_HASH)"
# Find a decryption_failed packet
UNDEC_HASH=$(docker exec meshcore-prod wget -qO- "http://localhost:3000/api/packets?type=5&limit=20" 2>/dev/null | python3 -c "import sys,json;d=json.load(sys.stdin);[print(p['hash']) or exit() for p in d.get('packets',[]) if 'decryption_failed' in p.get('decoded_json','') or 'no_key' in p.get('decoded_json','')]" 2>/dev/null || echo "")
if [ -n "$UNDEC_HASH" ]; then
docker exec meshcore-prod wget -qO- "http://localhost:3000/api/packets/$UNDEC_HASH" \
> "proto/testdata/node-fixtures/packet-type-grptxt-undecrypted.json" 2>/dev/null && \
echo " ✓ packet-type-grptxt-undecrypted.json (hash: $UNDEC_HASH)"
else
echo " ⚠ No undecrypted GRP_TXT found"
fi
else
docker exec meshcore-prod wget -qO- "http://localhost:3000/api/packets/$TYPE_HASH" \
> "proto/testdata/node-fixtures/packet-type-${TYPE_NAME}.json" 2>/dev/null && \
echo " ✓ packet-type-${TYPE_NAME}.json (hash: $TYPE_HASH)"
fi
else
echo " ⚠ No type=$TYPE_NUM ($TYPE_NAME) packets available"
fi
done
# Packet timestamps
docker exec meshcore-prod wget -qO- "http://localhost:3000/api/packets/timestamps?since=2026-03-01T00:00:00Z" \
> "proto/testdata/node-fixtures/packet-timestamps.json" 2>/dev/null && \
echo " ✓ packet-timestamps.json" || echo " ⚠ packet-timestamps failed"
# Packets grouped and since
docker exec meshcore-prod wget -qO- "http://localhost:3000/api/packets?limit=5&groupByHash=true" \
> "proto/testdata/node-fixtures/packets-grouped.json" 2>/dev/null && \
echo " ✓ packets-grouped.json" || echo " ⚠ packets-grouped failed"
docker exec meshcore-prod wget -qO- "http://localhost:3000/api/packets?limit=5&since=2026-03-01T00:00:00Z&groupByHash=true" \
> "proto/testdata/node-fixtures/packets-since.json" 2>/dev/null && \
echo " ✓ packets-since.json" || echo " ⚠ packets-since failed"
# Get a real observer ID
OBSID=$(docker exec meshcore-prod wget -qO- "http://localhost:3000/api/observers" 2>/dev/null | python3 -c "import sys,json; d=json.load(sys.stdin); print(d[0]['id'] if d and len(d) > 0 else '')" 2>/dev/null || echo "")
if [ -n "$OBSID" ]; then
echo " Using observer ID: $OBSID"
docker exec meshcore-prod wget -qO- "http://localhost:3000/api/observers/$OBSID" \
> "proto/testdata/node-fixtures/observer-detail.json" 2>/dev/null && \
echo " ✓ observer-detail.json"
docker exec meshcore-prod wget -qO- "http://localhost:3000/api/observers/$OBSID/analytics" \
> "proto/testdata/node-fixtures/observer-analytics.json" 2>/dev/null && \
echo " ✓ observer-analytics.json"
else
echo " ⚠ No observers available — skipping observer detail endpoints"
fi
# Channel messages
docker exec meshcore-prod wget -qO- "http://localhost:3000/api/channels/public/messages?limit=5" \
> "proto/testdata/node-fixtures/channel-messages.json" 2>/dev/null && \
echo " ✓ channel-messages.json" || echo " ⚠ channel-messages failed"
# WebSocket message capture (capture one message if available)
# Non-blocking: if no live packets, skip with warning
echo " Capturing WebSocket message..."
if docker exec meshcore-prod timeout 5 node -e "
const WebSocket = require('ws');
const ws = new WebSocket('ws://localhost:3000');
ws.on('message', (data) => {
console.log(data);
ws.close();
process.exit(0);
});
ws.on('error', () => { process.exit(1); });
" > "proto/testdata/node-fixtures/websocket-message.json" 2>/dev/null; then
echo " ✓ websocket-message.json"
else
echo " ⚠ websocket-message failed (no live packets) — skipping"
fi
echo ""
echo "Running proto validator..."
python3 tools/validate-protos.py || {
echo "❌ Proto validation failed — API contract drift detected"
echo "This means a Node.js API response doesn't match the proto definition."
echo "Fix by updating the .proto files in proto/ to match the actual API responses."
exit 1
}
echo "✅ Proto validation passed — API contract is consistent"
# ───────────────────────────────────────────────────────────────
# 6. Deploy Go Staging — start on port 82, healthcheck, smoke test
# ───────────────────────────────────────────────────────────────
deploy-go:
name: "🚀 Deploy Go Staging"
needs: [build-go]
runs-on: self-hosted
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Start Go staging on port 82
- name: Start staging on port 82
run: |
# Force remove stale containers
docker rm -f meshcore-staging-go 2>/dev/null || true
@@ -583,37 +308,37 @@ jobs:
fuser -k 82/tcp 2>/dev/null || true
docker compose --profile staging-go up -d staging-go
- name: Healthcheck Go staging container
- name: Healthcheck staging container
run: |
for i in $(seq 1 120); do
HEALTH=$(docker inspect meshcore-staging-go --format '{{.State.Health.Status}}' 2>/dev/null || echo "starting")
if [ "$HEALTH" = "healthy" ]; then
echo "Go staging healthy after ${i}s"
echo "Staging healthy after ${i}s"
break
fi
if [ "$i" -eq 120 ]; then
echo "Go staging failed health check after 120s"
echo "Staging failed health check after 120s"
docker logs meshcore-staging-go --tail 50
exit 1
fi
sleep 1
done
- name: Smoke test Go staging API
- name: Smoke test staging API
run: |
if curl -sf http://localhost:82/api/stats | grep -q engine; then
echo "Go staging verified — engine field present ✅"
echo "Staging verified — engine field present ✅"
else
echo "Go staging /api/stats did not return engine field"
echo "Staging /api/stats did not return engine field"
exit 1
fi
# ───────────────────────────────────────────────────────────────
# 7. Publish Badges & Summary — waits for both tracks to complete
# 5. Publish Badges & Summary
# ───────────────────────────────────────────────────────────────
publish:
name: "📝 Publish Badges & Summary"
needs: [deploy-node, deploy-go]
needs: [deploy]
runs-on: self-hosted
steps:
- name: Checkout code
@@ -648,8 +373,7 @@ jobs:
echo "" >> $GITHUB_STEP_SUMMARY
echo "**Commit:** \`$(git rev-parse --short HEAD)\` — $(git log -1 --format=%s)" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "**Node Staging:** http://<VM_HOST>:81" >> $GITHUB_STEP_SUMMARY
echo "**Go Staging:** http://<VM_HOST>:82" >> $GITHUB_STEP_SUMMARY
echo "**Staging:** http://<VM_HOST>:82" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "To promote to production:" >> $GITHUB_STEP_SUMMARY
echo "\`\`\`bash" >> $GITHUB_STEP_SUMMARY

View File

@@ -1,7 +1,7 @@
# MeshCore Analyzer
[![Backend Tests](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/Kpa-clawbot/meshcore-analyzer/master/.badges/backend-tests.json)](https://github.com/Kpa-clawbot/meshcore-analyzer/actions/workflows/deploy.yml)
[![Backend Coverage](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/Kpa-clawbot/meshcore-analyzer/master/.badges/backend-coverage.json)](https://github.com/Kpa-clawbot/meshcore-analyzer/actions/workflows/deploy.yml)
[![Go Server Coverage](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/Kpa-clawbot/meshcore-analyzer/master/.badges/go-server-coverage.json)](https://github.com/Kpa-clawbot/meshcore-analyzer/actions/workflows/deploy.yml)
[![Go Ingestor Coverage](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/Kpa-clawbot/meshcore-analyzer/master/.badges/go-ingestor-coverage.json)](https://github.com/Kpa-clawbot/meshcore-analyzer/actions/workflows/deploy.yml)
[![Frontend Tests](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/Kpa-clawbot/meshcore-analyzer/master/.badges/frontend-tests.json)](https://github.com/Kpa-clawbot/meshcore-analyzer/actions/workflows/deploy.yml)
[![Frontend Coverage](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/Kpa-clawbot/meshcore-analyzer/master/.badges/frontend-coverage.json)](https://github.com/Kpa-clawbot/meshcore-analyzer/actions/workflows/deploy.yml)
[![Deploy](https://github.com/Kpa-clawbot/meshcore-analyzer/actions/workflows/deploy.yml/badge.svg)](https://github.com/Kpa-clawbot/meshcore-analyzer/actions/workflows/deploy.yml)

View File

@@ -1,422 +1,469 @@
package main
import (
"database/sql"
"encoding/json"
"fmt"
"log"
"os"
"path/filepath"
"time"
_ "modernc.org/sqlite"
)
// Store wraps the SQLite database for packet ingestion.
type Store struct {
db *sql.DB
stmtGetTxByHash *sql.Stmt
stmtInsertTransmission *sql.Stmt
stmtUpdateTxFirstSeen *sql.Stmt
stmtInsertObservation *sql.Stmt
stmtUpsertNode *sql.Stmt
stmtIncrementAdvertCount *sql.Stmt
stmtUpsertObserver *sql.Stmt
stmtGetObserverRowid *sql.Stmt
}
// OpenStore opens or creates a SQLite DB at the given path, applying the
// v3 schema that is compatible with the Node.js server.
func OpenStore(dbPath string) (*Store, error) {
dir := filepath.Dir(dbPath)
if err := os.MkdirAll(dir, 0o755); err != nil {
return nil, fmt.Errorf("creating data dir: %w", err)
}
db, err := sql.Open("sqlite", dbPath+"?_pragma=journal_mode(WAL)&_pragma=foreign_keys(ON)")
if err != nil {
return nil, fmt.Errorf("opening db: %w", err)
}
if err := db.Ping(); err != nil {
return nil, fmt.Errorf("pinging db: %w", err)
}
if err := applySchema(db); err != nil {
return nil, fmt.Errorf("applying schema: %w", err)
}
s := &Store{db: db}
if err := s.prepareStatements(); err != nil {
return nil, fmt.Errorf("preparing statements: %w", err)
}
return s, nil
}
func applySchema(db *sql.DB) error {
schema := `
CREATE TABLE IF NOT EXISTS nodes (
public_key TEXT PRIMARY KEY,
name TEXT,
role TEXT,
lat REAL,
lon REAL,
last_seen TEXT,
first_seen TEXT,
advert_count INTEGER DEFAULT 0
);
CREATE TABLE IF NOT EXISTS observers (
id TEXT PRIMARY KEY,
name TEXT,
iata TEXT,
last_seen TEXT,
first_seen TEXT,
packet_count INTEGER DEFAULT 0,
model TEXT,
firmware TEXT,
client_version TEXT,
radio TEXT,
battery_mv INTEGER,
uptime_secs INTEGER,
noise_floor INTEGER
);
CREATE INDEX IF NOT EXISTS idx_nodes_last_seen ON nodes(last_seen);
CREATE INDEX IF NOT EXISTS idx_observers_last_seen ON observers(last_seen);
CREATE TABLE IF NOT EXISTS inactive_nodes (
public_key TEXT PRIMARY KEY,
name TEXT,
role TEXT,
lat REAL,
lon REAL,
last_seen TEXT,
first_seen TEXT,
advert_count INTEGER DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_inactive_nodes_last_seen ON inactive_nodes(last_seen);
CREATE TABLE IF NOT EXISTS transmissions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
raw_hex TEXT NOT NULL,
hash TEXT NOT NULL UNIQUE,
first_seen TEXT NOT NULL,
route_type INTEGER,
payload_type INTEGER,
payload_version INTEGER,
decoded_json TEXT,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_transmissions_hash ON transmissions(hash);
CREATE INDEX IF NOT EXISTS idx_transmissions_first_seen ON transmissions(first_seen);
CREATE INDEX IF NOT EXISTS idx_transmissions_payload_type ON transmissions(payload_type);
`
if _, err := db.Exec(schema); err != nil {
return fmt.Errorf("base schema: %w", err)
}
// Create observations table (v3 schema)
obsExists := false
row := db.QueryRow("SELECT name FROM sqlite_master WHERE type='table' AND name='observations'")
var dummy string
if row.Scan(&dummy) == nil {
obsExists = true
}
if !obsExists {
obs := `
CREATE TABLE observations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
transmission_id INTEGER NOT NULL REFERENCES transmissions(id),
observer_idx INTEGER,
direction TEXT,
snr REAL,
rssi REAL,
score INTEGER,
path_json TEXT,
timestamp INTEGER NOT NULL
);
CREATE INDEX idx_observations_transmission_id ON observations(transmission_id);
CREATE INDEX idx_observations_observer_idx ON observations(observer_idx);
CREATE INDEX idx_observations_timestamp ON observations(timestamp);
CREATE UNIQUE INDEX IF NOT EXISTS idx_observations_dedup ON observations(transmission_id, observer_idx, COALESCE(path_json, ''));
`
if _, err := db.Exec(obs); err != nil {
return fmt.Errorf("observations schema: %w", err)
}
}
// One-time migration: recalculate advert_count to count unique transmissions only
db.Exec(`CREATE TABLE IF NOT EXISTS _migrations (name TEXT PRIMARY KEY)`)
var migDone int
row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'advert_count_unique_v1'")
if row.Scan(&migDone) != nil {
log.Println("[migration] Recalculating advert_count (unique transmissions only)...")
db.Exec(`
UPDATE nodes SET advert_count = (
SELECT COUNT(*) FROM transmissions t
WHERE t.payload_type = 4
AND t.decoded_json LIKE '%' || nodes.public_key || '%'
)
`)
db.Exec(`INSERT INTO _migrations (name) VALUES ('advert_count_unique_v1')`)
log.Println("[migration] advert_count recalculated")
}
return nil
}
func (s *Store) prepareStatements() error {
var err error
s.stmtGetTxByHash, err = s.db.Prepare("SELECT id, first_seen FROM transmissions WHERE hash = ?")
if err != nil {
return err
}
s.stmtInsertTransmission, err = s.db.Prepare(`
INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, payload_version, decoded_json)
VALUES (?, ?, ?, ?, ?, ?, ?)
`)
if err != nil {
return err
}
s.stmtUpdateTxFirstSeen, err = s.db.Prepare("UPDATE transmissions SET first_seen = ? WHERE id = ?")
if err != nil {
return err
}
s.stmtInsertObservation, err = s.db.Prepare(`
INSERT OR IGNORE INTO observations (transmission_id, observer_idx, direction, snr, rssi, score, path_json, timestamp)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
`)
if err != nil {
return err
}
s.stmtUpsertNode, err = s.db.Prepare(`
INSERT INTO nodes (public_key, name, role, lat, lon, last_seen, first_seen)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(public_key) DO UPDATE SET
name = COALESCE(?, name),
role = COALESCE(?, role),
lat = COALESCE(?, lat),
lon = COALESCE(?, lon),
last_seen = ?
`)
if err != nil {
return err
}
s.stmtIncrementAdvertCount, err = s.db.Prepare(`
UPDATE nodes SET advert_count = advert_count + 1 WHERE public_key = ?
`)
if err != nil {
return err
}
s.stmtUpsertObserver, err = s.db.Prepare(`
INSERT INTO observers (id, name, iata, last_seen, first_seen, packet_count)
VALUES (?, ?, ?, ?, ?, 1)
ON CONFLICT(id) DO UPDATE SET
name = COALESCE(?, name),
iata = COALESCE(?, iata),
last_seen = ?,
packet_count = packet_count + 1
`)
if err != nil {
return err
}
s.stmtGetObserverRowid, err = s.db.Prepare("SELECT rowid FROM observers WHERE id = ?")
if err != nil {
return err
}
return nil
}
// InsertTransmission inserts a decoded packet into transmissions + observations.
// Returns true if a new transmission was created (not a duplicate hash).
func (s *Store) InsertTransmission(data *PacketData) (bool, error) {
hash := data.Hash
if hash == "" {
return false, nil
}
now := data.Timestamp
if now == "" {
now = time.Now().UTC().Format(time.RFC3339)
}
var txID int64
isNew := false
// Check for existing transmission
var existingID int64
var existingFirstSeen string
err := s.stmtGetTxByHash.QueryRow(hash).Scan(&existingID, &existingFirstSeen)
if err == nil {
// Existing transmission
txID = existingID
if now < existingFirstSeen {
_, _ = s.stmtUpdateTxFirstSeen.Exec(now, txID)
}
} else {
// New transmission
isNew = true
result, err := s.stmtInsertTransmission.Exec(
data.RawHex, hash, now,
data.RouteType, data.PayloadType, data.PayloadVersion,
data.DecodedJSON,
)
if err != nil {
return false, fmt.Errorf("insert transmission: %w", err)
}
txID, _ = result.LastInsertId()
}
// Resolve observer_idx
var observerIdx *int64
if data.ObserverID != "" {
var rowid int64
err := s.stmtGetObserverRowid.QueryRow(data.ObserverID).Scan(&rowid)
if err == nil {
observerIdx = &rowid
}
}
// Insert observation
epochTs := time.Now().Unix()
if t, err := time.Parse(time.RFC3339, now); err == nil {
epochTs = t.Unix()
}
_, err = s.stmtInsertObservation.Exec(
txID, observerIdx, nil, // direction
data.SNR, data.RSSI, nil, // score
data.PathJSON, epochTs,
)
if err != nil {
log.Printf("[db] observation insert (non-fatal): %v", err)
}
return isNew, nil
}
// UpsertNode inserts or updates a node.
func (s *Store) UpsertNode(pubKey, name, role string, lat, lon *float64, lastSeen string) error {
now := lastSeen
if now == "" {
now = time.Now().UTC().Format(time.RFC3339)
}
_, err := s.stmtUpsertNode.Exec(
pubKey, name, role, lat, lon, now, now,
name, role, lat, lon, now,
)
return err
}
// IncrementAdvertCount increments advert_count for a node by public key.
func (s *Store) IncrementAdvertCount(pubKey string) error {
_, err := s.stmtIncrementAdvertCount.Exec(pubKey)
return err
}
// UpsertObserver inserts or updates an observer.
func (s *Store) UpsertObserver(id, name, iata string) error {
now := time.Now().UTC().Format(time.RFC3339)
_, err := s.stmtUpsertObserver.Exec(
id, name, iata, now, now,
name, iata, now,
)
return err
}
// Close closes the database.
func (s *Store) Close() error {
return s.db.Close()
}
// MoveStaleNodes moves nodes not seen in nodeDays to the inactive_nodes table.
// Returns the number of nodes moved.
func (s *Store) MoveStaleNodes(nodeDays int) (int64, error) {
cutoff := time.Now().UTC().AddDate(0, 0, -nodeDays).Format(time.RFC3339)
tx, err := s.db.Begin()
if err != nil {
return 0, fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback()
_, err = tx.Exec(`INSERT OR REPLACE INTO inactive_nodes SELECT * FROM nodes WHERE last_seen < ?`, cutoff)
if err != nil {
return 0, fmt.Errorf("insert inactive: %w", err)
}
result, err := tx.Exec(`DELETE FROM nodes WHERE last_seen < ?`, cutoff)
if err != nil {
return 0, fmt.Errorf("delete stale: %w", err)
}
moved, _ := result.RowsAffected()
if err := tx.Commit(); err != nil {
return 0, fmt.Errorf("commit: %w", err)
}
if moved > 0 {
log.Printf("Moved %d node(s) to inactive_nodes (not seen in %d days)", moved, nodeDays)
}
return moved, nil
}
// PacketData holds the data needed to insert a packet into the DB.
type PacketData struct {
RawHex string
Timestamp string
ObserverID string
ObserverName string
SNR *float64
RSSI *float64
Hash string
RouteType int
PayloadType int
PayloadVersion int
PathJSON string
DecodedJSON string
}
// MQTTPacketMessage is the JSON payload from an MQTT raw packet message.
type MQTTPacketMessage struct {
Raw string `json:"raw"`
SNR *float64 `json:"SNR"`
RSSI *float64 `json:"RSSI"`
Origin string `json:"origin"`
}
// BuildPacketData constructs a PacketData from a decoded packet and MQTT message.
func BuildPacketData(msg *MQTTPacketMessage, decoded *DecodedPacket, observerID, region string) *PacketData {
now := time.Now().UTC().Format(time.RFC3339)
pathJSON := "[]"
if len(decoded.Path.Hops) > 0 {
b, _ := json.Marshal(decoded.Path.Hops)
pathJSON = string(b)
}
return &PacketData{
RawHex: msg.Raw,
Timestamp: now,
ObserverID: observerID,
ObserverName: msg.Origin,
SNR: msg.SNR,
RSSI: msg.RSSI,
Hash: ComputeContentHash(msg.Raw),
RouteType: decoded.Header.RouteType,
PayloadType: decoded.Header.PayloadType,
PayloadVersion: decoded.Header.PayloadVersion,
PathJSON: pathJSON,
DecodedJSON: PayloadJSON(&decoded.Payload),
}
}
package main
import (
"database/sql"
"encoding/json"
"fmt"
"log"
"os"
"path/filepath"
"sync/atomic"
"time"
_ "modernc.org/sqlite"
)
// DBStats tracks operational metrics for the ingestor database.
type DBStats struct {
TransmissionsInserted atomic.Int64
ObservationsInserted atomic.Int64
DuplicateTransmissions atomic.Int64
NodeUpserts atomic.Int64
ObserverUpserts atomic.Int64
WriteErrors atomic.Int64
}
// Store wraps the SQLite database for packet ingestion.
type Store struct {
db *sql.DB
Stats DBStats
stmtGetTxByHash *sql.Stmt
stmtInsertTransmission *sql.Stmt
stmtUpdateTxFirstSeen *sql.Stmt
stmtInsertObservation *sql.Stmt
stmtUpsertNode *sql.Stmt
stmtIncrementAdvertCount *sql.Stmt
stmtUpsertObserver *sql.Stmt
stmtGetObserverRowid *sql.Stmt
}
// OpenStore opens or creates a SQLite DB at the given path, applying the
// v3 schema that is compatible with the Node.js server.
func OpenStore(dbPath string) (*Store, error) {
dir := filepath.Dir(dbPath)
if err := os.MkdirAll(dir, 0o755); err != nil {
return nil, fmt.Errorf("creating data dir: %w", err)
}
db, err := sql.Open("sqlite", dbPath+"?_pragma=journal_mode(WAL)&_pragma=foreign_keys(ON)&_pragma=busy_timeout(5000)")
if err != nil {
return nil, fmt.Errorf("opening db: %w", err)
}
if err := db.Ping(); err != nil {
return nil, fmt.Errorf("pinging db: %w", err)
}
db.SetMaxOpenConns(1)
db.SetMaxIdleConns(1)
log.Printf("SQLite config: busy_timeout=5000ms, max_open_conns=1, max_idle_conns=1, journal=WAL")
if err := applySchema(db); err != nil {
return nil, fmt.Errorf("applying schema: %w", err)
}
s := &Store{db: db}
if err := s.prepareStatements(); err != nil {
return nil, fmt.Errorf("preparing statements: %w", err)
}
return s, nil
}
func applySchema(db *sql.DB) error {
schema := `
CREATE TABLE IF NOT EXISTS nodes (
public_key TEXT PRIMARY KEY,
name TEXT,
role TEXT,
lat REAL,
lon REAL,
last_seen TEXT,
first_seen TEXT,
advert_count INTEGER DEFAULT 0
);
CREATE TABLE IF NOT EXISTS observers (
id TEXT PRIMARY KEY,
name TEXT,
iata TEXT,
last_seen TEXT,
first_seen TEXT,
packet_count INTEGER DEFAULT 0,
model TEXT,
firmware TEXT,
client_version TEXT,
radio TEXT,
battery_mv INTEGER,
uptime_secs INTEGER,
noise_floor INTEGER
);
CREATE INDEX IF NOT EXISTS idx_nodes_last_seen ON nodes(last_seen);
CREATE INDEX IF NOT EXISTS idx_observers_last_seen ON observers(last_seen);
CREATE TABLE IF NOT EXISTS inactive_nodes (
public_key TEXT PRIMARY KEY,
name TEXT,
role TEXT,
lat REAL,
lon REAL,
last_seen TEXT,
first_seen TEXT,
advert_count INTEGER DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_inactive_nodes_last_seen ON inactive_nodes(last_seen);
CREATE TABLE IF NOT EXISTS transmissions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
raw_hex TEXT NOT NULL,
hash TEXT NOT NULL UNIQUE,
first_seen TEXT NOT NULL,
route_type INTEGER,
payload_type INTEGER,
payload_version INTEGER,
decoded_json TEXT,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_transmissions_hash ON transmissions(hash);
CREATE INDEX IF NOT EXISTS idx_transmissions_first_seen ON transmissions(first_seen);
CREATE INDEX IF NOT EXISTS idx_transmissions_payload_type ON transmissions(payload_type);
`
if _, err := db.Exec(schema); err != nil {
return fmt.Errorf("base schema: %w", err)
}
// Create observations table (v3 schema)
obsExists := false
row := db.QueryRow("SELECT name FROM sqlite_master WHERE type='table' AND name='observations'")
var dummy string
if row.Scan(&dummy) == nil {
obsExists = true
}
if !obsExists {
obs := `
CREATE TABLE observations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
transmission_id INTEGER NOT NULL REFERENCES transmissions(id),
observer_idx INTEGER,
direction TEXT,
snr REAL,
rssi REAL,
score INTEGER,
path_json TEXT,
timestamp INTEGER NOT NULL
);
CREATE INDEX idx_observations_transmission_id ON observations(transmission_id);
CREATE INDEX idx_observations_observer_idx ON observations(observer_idx);
CREATE INDEX idx_observations_timestamp ON observations(timestamp);
CREATE UNIQUE INDEX IF NOT EXISTS idx_observations_dedup ON observations(transmission_id, observer_idx, COALESCE(path_json, ''));
`
if _, err := db.Exec(obs); err != nil {
return fmt.Errorf("observations schema: %w", err)
}
}
// One-time migration: recalculate advert_count to count unique transmissions only
db.Exec(`CREATE TABLE IF NOT EXISTS _migrations (name TEXT PRIMARY KEY)`)
var migDone int
row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'advert_count_unique_v1'")
if row.Scan(&migDone) != nil {
log.Println("[migration] Recalculating advert_count (unique transmissions only)...")
db.Exec(`
UPDATE nodes SET advert_count = (
SELECT COUNT(*) FROM transmissions t
WHERE t.payload_type = 4
AND t.decoded_json LIKE '%' || nodes.public_key || '%'
)
`)
db.Exec(`INSERT INTO _migrations (name) VALUES ('advert_count_unique_v1')`)
log.Println("[migration] advert_count recalculated")
}
return nil
}
func (s *Store) prepareStatements() error {
var err error
s.stmtGetTxByHash, err = s.db.Prepare("SELECT id, first_seen FROM transmissions WHERE hash = ?")
if err != nil {
return err
}
s.stmtInsertTransmission, err = s.db.Prepare(`
INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, payload_version, decoded_json)
VALUES (?, ?, ?, ?, ?, ?, ?)
`)
if err != nil {
return err
}
s.stmtUpdateTxFirstSeen, err = s.db.Prepare("UPDATE transmissions SET first_seen = ? WHERE id = ?")
if err != nil {
return err
}
s.stmtInsertObservation, err = s.db.Prepare(`
INSERT OR IGNORE INTO observations (transmission_id, observer_idx, direction, snr, rssi, score, path_json, timestamp)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
`)
if err != nil {
return err
}
s.stmtUpsertNode, err = s.db.Prepare(`
INSERT INTO nodes (public_key, name, role, lat, lon, last_seen, first_seen)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(public_key) DO UPDATE SET
name = COALESCE(?, name),
role = COALESCE(?, role),
lat = COALESCE(?, lat),
lon = COALESCE(?, lon),
last_seen = ?
`)
if err != nil {
return err
}
s.stmtIncrementAdvertCount, err = s.db.Prepare(`
UPDATE nodes SET advert_count = advert_count + 1 WHERE public_key = ?
`)
if err != nil {
return err
}
s.stmtUpsertObserver, err = s.db.Prepare(`
INSERT INTO observers (id, name, iata, last_seen, first_seen, packet_count)
VALUES (?, ?, ?, ?, ?, 1)
ON CONFLICT(id) DO UPDATE SET
name = COALESCE(?, name),
iata = COALESCE(?, iata),
last_seen = ?,
packet_count = packet_count + 1
`)
if err != nil {
return err
}
s.stmtGetObserverRowid, err = s.db.Prepare("SELECT rowid FROM observers WHERE id = ?")
if err != nil {
return err
}
return nil
}
// InsertTransmission inserts a decoded packet into transmissions + observations.
// Returns true if a new transmission was created (not a duplicate hash).
func (s *Store) InsertTransmission(data *PacketData) (bool, error) {
hash := data.Hash
if hash == "" {
return false, nil
}
now := data.Timestamp
if now == "" {
now = time.Now().UTC().Format(time.RFC3339)
}
var txID int64
isNew := false
// Check for existing transmission
var existingID int64
var existingFirstSeen string
err := s.stmtGetTxByHash.QueryRow(hash).Scan(&existingID, &existingFirstSeen)
if err == nil {
// Existing transmission
txID = existingID
if now < existingFirstSeen {
_, _ = s.stmtUpdateTxFirstSeen.Exec(now, txID)
}
} else {
// New transmission
isNew = true
result, err := s.stmtInsertTransmission.Exec(
data.RawHex, hash, now,
data.RouteType, data.PayloadType, data.PayloadVersion,
data.DecodedJSON,
)
if err != nil {
s.Stats.WriteErrors.Add(1)
return false, fmt.Errorf("insert transmission: %w", err)
}
txID, _ = result.LastInsertId()
s.Stats.TransmissionsInserted.Add(1)
}
if !isNew {
s.Stats.DuplicateTransmissions.Add(1)
}
// Resolve observer_idx
var observerIdx *int64
if data.ObserverID != "" {
var rowid int64
err := s.stmtGetObserverRowid.QueryRow(data.ObserverID).Scan(&rowid)
if err == nil {
observerIdx = &rowid
}
}
// Insert observation
epochTs := time.Now().Unix()
if t, err := time.Parse(time.RFC3339, now); err == nil {
epochTs = t.Unix()
}
_, err = s.stmtInsertObservation.Exec(
txID, observerIdx, nil, // direction
data.SNR, data.RSSI, nil, // score
data.PathJSON, epochTs,
)
if err != nil {
s.Stats.WriteErrors.Add(1)
log.Printf("[db] observation insert (non-fatal): %v", err)
} else {
s.Stats.ObservationsInserted.Add(1)
}
return isNew, nil
}
// UpsertNode inserts or updates a node.
func (s *Store) UpsertNode(pubKey, name, role string, lat, lon *float64, lastSeen string) error {
now := lastSeen
if now == "" {
now = time.Now().UTC().Format(time.RFC3339)
}
_, err := s.stmtUpsertNode.Exec(
pubKey, name, role, lat, lon, now, now,
name, role, lat, lon, now,
)
if err != nil {
s.Stats.WriteErrors.Add(1)
} else {
s.Stats.NodeUpserts.Add(1)
}
return err
}
// IncrementAdvertCount increments advert_count for a node by public key.
func (s *Store) IncrementAdvertCount(pubKey string) error {
_, err := s.stmtIncrementAdvertCount.Exec(pubKey)
return err
}
// UpsertObserver inserts or updates an observer.
func (s *Store) UpsertObserver(id, name, iata string) error {
now := time.Now().UTC().Format(time.RFC3339)
_, err := s.stmtUpsertObserver.Exec(
id, name, iata, now, now,
name, iata, now,
)
if err != nil {
s.Stats.WriteErrors.Add(1)
} else {
s.Stats.ObserverUpserts.Add(1)
}
return err
}
// Close closes the database.
func (s *Store) Close() error {
return s.db.Close()
}
// LogStats logs current operational metrics.
func (s *Store) LogStats() {
log.Printf("[stats] tx_inserted=%d tx_dupes=%d obs_inserted=%d node_upserts=%d observer_upserts=%d write_errors=%d",
s.Stats.TransmissionsInserted.Load(),
s.Stats.DuplicateTransmissions.Load(),
s.Stats.ObservationsInserted.Load(),
s.Stats.NodeUpserts.Load(),
s.Stats.ObserverUpserts.Load(),
s.Stats.WriteErrors.Load(),
)
}
// MoveStaleNodes moves nodes not seen in nodeDays to the inactive_nodes table.
// Returns the number of nodes moved.
func (s *Store) MoveStaleNodes(nodeDays int) (int64, error) {
cutoff := time.Now().UTC().AddDate(0, 0, -nodeDays).Format(time.RFC3339)
tx, err := s.db.Begin()
if err != nil {
return 0, fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback()
_, err = tx.Exec(`INSERT OR REPLACE INTO inactive_nodes SELECT * FROM nodes WHERE last_seen < ?`, cutoff)
if err != nil {
return 0, fmt.Errorf("insert inactive: %w", err)
}
result, err := tx.Exec(`DELETE FROM nodes WHERE last_seen < ?`, cutoff)
if err != nil {
return 0, fmt.Errorf("delete stale: %w", err)
}
moved, _ := result.RowsAffected()
if err := tx.Commit(); err != nil {
return 0, fmt.Errorf("commit: %w", err)
}
if moved > 0 {
log.Printf("Moved %d node(s) to inactive_nodes (not seen in %d days)", moved, nodeDays)
}
return moved, nil
}
// PacketData holds the data needed to insert a packet into the DB.
type PacketData struct {
RawHex string
Timestamp string
ObserverID string
ObserverName string
SNR *float64
RSSI *float64
Hash string
RouteType int
PayloadType int
PayloadVersion int
PathJSON string
DecodedJSON string
}
// MQTTPacketMessage is the JSON payload from an MQTT raw packet message.
type MQTTPacketMessage struct {
Raw string `json:"raw"`
SNR *float64 `json:"SNR"`
RSSI *float64 `json:"RSSI"`
Origin string `json:"origin"`
}
// BuildPacketData constructs a PacketData from a decoded packet and MQTT message.
func BuildPacketData(msg *MQTTPacketMessage, decoded *DecodedPacket, observerID, region string) *PacketData {
now := time.Now().UTC().Format(time.RFC3339)
pathJSON := "[]"
if len(decoded.Path.Hops) > 0 {
b, _ := json.Marshal(decoded.Path.Hops)
pathJSON = string(b)
}
return &PacketData{
RawHex: msg.Raw,
Timestamp: now,
ObserverID: observerID,
ObserverName: msg.Origin,
SNR: msg.SNR,
RSSI: msg.RSSI,
Hash: ComputeContentHash(msg.Raw),
RouteType: decoded.Header.RouteType,
PayloadType: decoded.Header.PayloadType,
PayloadVersion: decoded.Header.PayloadVersion,
PathJSON: pathJSON,
DecodedJSON: PayloadJSON(&decoded.Payload),
}
}

View File

@@ -1,10 +1,14 @@
package main
import (
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"sync/atomic"
"testing"
"time"
)
func tempDBPath(t *testing.T) string {
@@ -626,3 +630,306 @@ func TestSchemaCompatibility(t *testing.T) {
}
}
}
func TestConcurrentWrites(t *testing.T) {
s, err := OpenStore(tempDBPath(t))
if err != nil {
t.Fatal(err)
}
defer s.Close()
// Pre-create an observer for observer_idx resolution
if err := s.UpsertObserver("obs1", "Observer1", "SJC"); err != nil {
t.Fatal(err)
}
const goroutines = 20
const writesPerGoroutine = 50
errCh := make(chan error, goroutines*writesPerGoroutine)
done := make(chan struct{})
for g := 0; g < goroutines; g++ {
go func(gIdx int) {
defer func() { done <- struct{}{} }()
for i := 0; i < writesPerGoroutine; i++ {
hash := fmt.Sprintf("concurrent_%d_%d_____", gIdx, i) // pad to 16+ chars
snr := 5.0
rssi := -100.0
data := &PacketData{
RawHex: "0A00D69F",
Timestamp: time.Now().UTC().Format(time.RFC3339),
ObserverID: "obs1",
Hash: hash[:16],
RouteType: 2,
PayloadType: 4, // ADVERT
PathJSON: "[]",
DecodedJSON: `{"type":"ADVERT"}`,
SNR: &snr,
RSSI: &rssi,
}
if _, err := s.InsertTransmission(data); err != nil {
errCh <- fmt.Errorf("goroutine %d write %d: %w", gIdx, i, err)
return
}
// Also do node + observer upserts to simulate full pipeline
lat := 37.0
lon := -122.0
pubKey := fmt.Sprintf("node_%d_%d________", gIdx, i)
if err := s.UpsertNode(pubKey[:16], "Node", "repeater", &lat, &lon, data.Timestamp); err != nil {
errCh <- fmt.Errorf("goroutine %d node upsert %d: %w", gIdx, i, err)
return
}
obsID := fmt.Sprintf("obs_%d_%d__________", gIdx, i)
if err := s.UpsertObserver(obsID[:16], "Obs", "SJC"); err != nil {
errCh <- fmt.Errorf("goroutine %d observer upsert %d: %w", gIdx, i, err)
return
}
}
}(g)
}
// Wait for all goroutines
for g := 0; g < goroutines; g++ {
<-done
}
close(errCh)
var errors []error
for err := range errCh {
errors = append(errors, err)
}
if len(errors) > 0 {
t.Errorf("got %d errors from %d concurrent writers (first: %v)", len(errors), goroutines, errors[0])
}
// Verify data integrity
var txCount, obsCount, nodeCount, observerCount int
s.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&txCount)
s.db.QueryRow("SELECT COUNT(*) FROM observations").Scan(&obsCount)
s.db.QueryRow("SELECT COUNT(*) FROM nodes").Scan(&nodeCount)
s.db.QueryRow("SELECT COUNT(*) FROM observers").Scan(&observerCount)
expectedTx := goroutines * writesPerGoroutine
if txCount != expectedTx {
t.Errorf("transmissions count=%d, want %d", txCount, expectedTx)
}
if obsCount != expectedTx {
t.Errorf("observations count=%d, want %d", obsCount, expectedTx)
}
t.Logf("Concurrent write test: %d goroutines × %d writes = %d total, 0 errors",
goroutines, writesPerGoroutine, goroutines*writesPerGoroutine)
t.Logf("Stats: tx_inserted=%d tx_dupes=%d obs_inserted=%d write_errors=%d",
s.Stats.TransmissionsInserted.Load(),
s.Stats.DuplicateTransmissions.Load(),
s.Stats.ObservationsInserted.Load(),
s.Stats.WriteErrors.Load(),
)
}
func TestDBStats(t *testing.T) {
s, err := OpenStore(tempDBPath(t))
if err != nil {
t.Fatal(err)
}
defer s.Close()
// Initial stats should be zero
if s.Stats.TransmissionsInserted.Load() != 0 {
t.Error("initial TransmissionsInserted should be 0")
}
if s.Stats.WriteErrors.Load() != 0 {
t.Error("initial WriteErrors should be 0")
}
// Insert a transmission
data := &PacketData{
RawHex: "0A00D69F",
Timestamp: "2026-03-28T00:00:00Z",
Hash: "stats_test_12345",
RouteType: 2,
PathJSON: "[]",
}
if _, err := s.InsertTransmission(data); err != nil {
t.Fatal(err)
}
if s.Stats.TransmissionsInserted.Load() != 1 {
t.Errorf("TransmissionsInserted=%d, want 1", s.Stats.TransmissionsInserted.Load())
}
if s.Stats.ObservationsInserted.Load() != 1 {
t.Errorf("ObservationsInserted=%d, want 1", s.Stats.ObservationsInserted.Load())
}
// Insert duplicate
if _, err := s.InsertTransmission(data); err != nil {
t.Fatal(err)
}
if s.Stats.DuplicateTransmissions.Load() != 1 {
t.Errorf("DuplicateTransmissions=%d, want 1", s.Stats.DuplicateTransmissions.Load())
}
// Node upsert
lat := 37.0
lon := -122.0
if err := s.UpsertNode("pk1", "Node1", "repeater", &lat, &lon, "2026-03-28T00:00:00Z"); err != nil {
t.Fatal(err)
}
if s.Stats.NodeUpserts.Load() != 1 {
t.Errorf("NodeUpserts=%d, want 1", s.Stats.NodeUpserts.Load())
}
// Observer upsert
if err := s.UpsertObserver("obs1", "Obs1", "SJC"); err != nil {
t.Fatal(err)
}
if s.Stats.ObserverUpserts.Load() != 1 {
t.Errorf("ObserverUpserts=%d, want 1", s.Stats.ObserverUpserts.Load())
}
// LogStats should not panic
s.LogStats()
}
func TestLoadTestThroughput(t *testing.T) {
s, err := OpenStore(tempDBPath(t))
if err != nil {
t.Fatal(err)
}
defer s.Close()
// Pre-create observer
if err := s.UpsertObserver("obs1", "Observer1", "SJC"); err != nil {
t.Fatal(err)
}
const totalMessages = 1000
const goroutines = 20
perGoroutine := totalMessages / goroutines
// Simulate full pipeline: InsertTransmission + UpsertNode + UpsertObserver + IncrementAdvertCount
// This matches the real handleMessage write pattern for ADVERT packets
latencies := make([]time.Duration, totalMessages)
var busyErrors atomic.Int64
var totalErrors atomic.Int64
errCh := make(chan error, totalMessages)
done := make(chan struct{})
start := time.Now()
for g := 0; g < goroutines; g++ {
go func(gIdx int) {
defer func() { done <- struct{}{} }()
for i := 0; i < perGoroutine; i++ {
msgStart := time.Now()
idx := gIdx*perGoroutine + i
hash := fmt.Sprintf("load_%04d_%04d____", gIdx, i)
snr := 5.0
rssi := -100.0
data := &PacketData{
RawHex: "0A00D69F",
Timestamp: time.Now().UTC().Format(time.RFC3339),
ObserverID: "obs1",
Hash: hash[:16],
RouteType: 2,
PayloadType: 4,
PathJSON: "[]",
DecodedJSON: `{"type":"ADVERT","pubKey":"` + hash[:16] + `"}`,
SNR: &snr,
RSSI: &rssi,
}
_, err := s.InsertTransmission(data)
if err != nil {
totalErrors.Add(1)
if strings.Contains(err.Error(), "database is locked") || strings.Contains(err.Error(), "SQLITE_BUSY") {
busyErrors.Add(1)
}
errCh <- err
continue
}
lat := 37.0 + float64(gIdx)*0.001
lon := -122.0 + float64(i)*0.001
pubKey := fmt.Sprintf("node_%04d_%04d____", gIdx, i)
if err := s.UpsertNode(pubKey[:16], "Node", "repeater", &lat, &lon, data.Timestamp); err != nil {
totalErrors.Add(1)
if strings.Contains(err.Error(), "locked") || strings.Contains(err.Error(), "BUSY") {
busyErrors.Add(1)
}
}
if err := s.IncrementAdvertCount(pubKey[:16]); err != nil {
totalErrors.Add(1)
}
obsID := fmt.Sprintf("obs_%04d_%04d_____", gIdx, i)
if err := s.UpsertObserver(obsID[:16], "Obs", "SJC"); err != nil {
totalErrors.Add(1)
if strings.Contains(err.Error(), "locked") || strings.Contains(err.Error(), "BUSY") {
busyErrors.Add(1)
}
}
latencies[idx] = time.Since(msgStart)
}
}(g)
}
for g := 0; g < goroutines; g++ {
<-done
}
close(errCh)
elapsed := time.Since(start)
// Calculate p50, p95, p99
validLatencies := make([]time.Duration, 0, totalMessages)
for _, l := range latencies {
if l > 0 {
validLatencies = append(validLatencies, l)
}
}
sort.Slice(validLatencies, func(i, j int) bool { return validLatencies[i] < validLatencies[j] })
p50 := validLatencies[len(validLatencies)*50/100]
p95 := validLatencies[len(validLatencies)*95/100]
p99 := validLatencies[len(validLatencies)*99/100]
msgsPerSec := float64(totalMessages) / elapsed.Seconds()
t.Logf("=== LOAD TEST RESULTS ===")
t.Logf("Messages: %d (%d goroutines × %d each)", totalMessages, goroutines, perGoroutine)
t.Logf("Writes/msg: 4 (InsertTx + UpsertNode + IncrAdvertCount + UpsertObserver)")
t.Logf("Total writes: %d", totalMessages*4)
t.Logf("Duration: %s", elapsed.Round(time.Millisecond))
t.Logf("Throughput: %.1f msgs/sec (%.1f writes/sec)", msgsPerSec, msgsPerSec*4)
t.Logf("Latency p50: %s", p50.Round(time.Microsecond))
t.Logf("Latency p95: %s", p95.Round(time.Microsecond))
t.Logf("Latency p99: %s", p99.Round(time.Microsecond))
t.Logf("SQLITE_BUSY: %d", busyErrors.Load())
t.Logf("Total errors: %d", totalErrors.Load())
t.Logf("Stats: tx=%d dupes=%d obs=%d nodes=%d observers=%d write_err=%d",
s.Stats.TransmissionsInserted.Load(),
s.Stats.DuplicateTransmissions.Load(),
s.Stats.ObservationsInserted.Load(),
s.Stats.NodeUpserts.Load(),
s.Stats.ObserverUpserts.Load(),
s.Stats.WriteErrors.Load(),
)
// Hard assertions
if busyErrors.Load() > 0 {
t.Errorf("SQLITE_BUSY errors: %d (expected 0)", busyErrors.Load())
}
if totalErrors.Load() > 0 {
t.Errorf("Total errors: %d (expected 0)", totalErrors.Load())
}
var txCount int
s.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&txCount)
if txCount != totalMessages {
t.Errorf("transmissions=%d, want %d", txCount, totalMessages)
}
}

View File

@@ -1,491 +1,501 @@
package main
import (
"crypto/sha256"
"crypto/tls"
"encoding/hex"
"encoding/json"
"flag"
"fmt"
"log"
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
func main() {
configPath := flag.String("config", "config.json", "path to config file")
flag.Parse()
log.SetFlags(log.LstdFlags | log.Lmsgprefix)
log.SetPrefix("[ingestor] ")
cfg, err := LoadConfig(*configPath)
if err != nil {
log.Fatalf("config: %v", err)
}
sources := cfg.ResolvedSources()
if len(sources) == 0 {
log.Fatal("no MQTT sources configured — set mqttSources in config or MQTT_BROKER env var")
}
store, err := OpenStore(cfg.DBPath)
if err != nil {
log.Fatalf("db: %v", err)
}
defer store.Close()
log.Printf("SQLite opened: %s", cfg.DBPath)
// Node retention: move stale nodes to inactive_nodes on startup
nodeDays := cfg.NodeDaysOrDefault()
store.MoveStaleNodes(nodeDays)
// Daily ticker for node retention
retentionTicker := time.NewTicker(1 * time.Hour)
go func() {
for range retentionTicker.C {
store.MoveStaleNodes(nodeDays)
}
}()
channelKeys := loadChannelKeys(cfg, *configPath)
if len(channelKeys) > 0 {
log.Printf("Loaded %d channel keys for GRP_TXT decryption", len(channelKeys))
} else {
log.Printf("No channel keys loaded — GRP_TXT packets will not be decrypted")
}
// Connect to each MQTT source
var clients []mqtt.Client
for _, source := range sources {
tag := source.Name
if tag == "" {
tag = source.Broker
}
opts := mqtt.NewClientOptions().
AddBroker(source.Broker).
SetAutoReconnect(true).
SetConnectRetry(true).
SetOrderMatters(false)
if source.Username != "" {
opts.SetUsername(source.Username)
}
if source.Password != "" {
opts.SetPassword(source.Password)
}
if source.RejectUnauthorized != nil && !*source.RejectUnauthorized {
opts.SetTLSConfig(&tls.Config{InsecureSkipVerify: true})
} else if strings.HasPrefix(source.Broker, "ssl://") {
opts.SetTLSConfig(&tls.Config{})
}
opts.SetOnConnectHandler(func(c mqtt.Client) {
log.Printf("MQTT [%s] connected to %s", tag, source.Broker)
topics := source.Topics
if len(topics) == 0 {
topics = []string{"meshcore/#"}
}
for _, t := range topics {
token := c.Subscribe(t, 0, nil)
token.Wait()
if token.Error() != nil {
log.Printf("MQTT [%s] subscribe error for %s: %v", tag, t, token.Error())
} else {
log.Printf("MQTT [%s] subscribed to %s", tag, t)
}
}
})
opts.SetConnectionLostHandler(func(c mqtt.Client, err error) {
log.Printf("MQTT [%s] disconnected: %v", tag, err)
})
// Capture source for closure
src := source
opts.SetDefaultPublishHandler(func(c mqtt.Client, m mqtt.Message) {
handleMessage(store, tag, src, m, channelKeys)
})
client := mqtt.NewClient(opts)
token := client.Connect()
token.Wait()
if token.Error() != nil {
log.Printf("MQTT [%s] connection failed (non-fatal): %v", tag, token.Error())
continue
}
clients = append(clients, client)
}
if len(clients) == 0 {
log.Fatal("no MQTT connections established")
}
log.Printf("Running — %d MQTT source(s) connected", len(clients))
// Wait for shutdown signal
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
<-sig
log.Println("Shutting down...")
retentionTicker.Stop()
for _, c := range clients {
c.Disconnect(1000)
}
log.Println("Done.")
}
func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message, channelKeys map[string]string) {
defer func() {
if r := recover(); r != nil {
log.Printf("MQTT [%s] panic in handler: %v", tag, r)
}
}()
topic := m.Topic()
parts := strings.Split(topic, "/")
// IATA filter
if len(source.IATAFilter) > 0 && len(parts) > 1 {
region := parts[1]
matched := false
for _, f := range source.IATAFilter {
if f == region {
matched = true
break
}
}
if !matched {
return
}
}
var msg map[string]interface{}
if err := json.Unmarshal(m.Payload(), &msg); err != nil {
return
}
// Skip status/connection topics
if topic == "meshcore/status" || topic == "meshcore/events/connection" {
return
}
// Status topic: meshcore/<region>/<observer_id>/status
if len(parts) >= 4 && parts[3] == "status" {
observerID := parts[2]
name, _ := msg["origin"].(string)
iata := parts[1]
if err := store.UpsertObserver(observerID, name, iata); err != nil {
log.Printf("MQTT [%s] observer status error: %v", tag, err)
}
log.Printf("MQTT [%s] status: %s (%s)", tag, firstNonEmpty(name, observerID), iata)
return
}
// Format 1: Raw packet (meshcoretomqtt / Cisien format)
rawHex, _ := msg["raw"].(string)
if rawHex != "" {
decoded, err := DecodePacket(rawHex, channelKeys)
if err != nil {
log.Printf("MQTT [%s] decode error: %v", tag, err)
return
}
observerID := ""
region := ""
if len(parts) > 2 {
observerID = parts[2]
}
if len(parts) > 1 {
region = parts[1]
}
mqttMsg := &MQTTPacketMessage{Raw: rawHex}
if v, ok := msg["SNR"]; ok {
if f, ok := toFloat64(v); ok {
mqttMsg.SNR = &f
}
}
if v, ok := msg["RSSI"]; ok {
if f, ok := toFloat64(v); ok {
mqttMsg.RSSI = &f
}
}
if v, ok := msg["origin"].(string); ok {
mqttMsg.Origin = v
}
pktData := BuildPacketData(mqttMsg, decoded, observerID, region)
isNew, err := store.InsertTransmission(pktData)
if err != nil {
log.Printf("MQTT [%s] db insert error: %v", tag, err)
}
// Process ADVERT → upsert node
if decoded.Header.PayloadTypeName == "ADVERT" && decoded.Payload.PubKey != "" {
ok, reason := ValidateAdvert(&decoded.Payload)
if ok {
role := advertRole(decoded.Payload.Flags)
if err := store.UpsertNode(decoded.Payload.PubKey, decoded.Payload.Name, role, decoded.Payload.Lat, decoded.Payload.Lon, pktData.Timestamp); err != nil {
log.Printf("MQTT [%s] node upsert error: %v", tag, err)
}
if isNew {
if err := store.IncrementAdvertCount(decoded.Payload.PubKey); err != nil {
log.Printf("MQTT [%s] advert count error: %v", tag, err)
}
}
} else {
log.Printf("MQTT [%s] skipping corrupted ADVERT: %s", tag, reason)
}
}
// Upsert observer
if observerID != "" {
origin, _ := msg["origin"].(string)
if err := store.UpsertObserver(observerID, origin, region); err != nil {
log.Printf("MQTT [%s] observer upsert error: %v", tag, err)
}
}
return
}
// Format 2: Companion bridge channel message (meshcore/message/channel/<n>)
if strings.HasPrefix(topic, "meshcore/message/channel/") {
text, _ := msg["text"].(string)
if text == "" {
return
}
channelIdx := ""
if len(parts) >= 4 {
channelIdx = parts[3]
}
if ci, ok := msg["channel_idx"]; ok {
channelIdx = fmt.Sprintf("%v", ci)
}
// Extract sender from "Name: message" format
sender := ""
if idx := strings.Index(text, ": "); idx > 0 && idx < 50 {
sender = text[:idx]
}
channelName := fmt.Sprintf("ch%s", channelIdx)
// Build decoded JSON matching Node.js CHAN format
channelMsg := map[string]interface{}{
"type": "CHAN",
"channel": channelName,
"text": text,
"sender": sender,
}
if st, ok := msg["sender_timestamp"]; ok {
channelMsg["sender_timestamp"] = st
}
decodedJSON, _ := json.Marshal(channelMsg)
now := time.Now().UTC().Format(time.RFC3339)
hashInput := fmt.Sprintf("ch:%s:%s:%s", channelIdx, text, now)
h := sha256.Sum256([]byte(hashInput))
hash := hex.EncodeToString(h[:])[:16]
var snr, rssi *float64
if v, ok := msg["SNR"]; ok {
if f, ok := toFloat64(v); ok {
snr = &f
}
} else if v, ok := msg["snr"]; ok {
if f, ok := toFloat64(v); ok {
snr = &f
}
}
if v, ok := msg["RSSI"]; ok {
if f, ok := toFloat64(v); ok {
rssi = &f
}
} else if v, ok := msg["rssi"]; ok {
if f, ok := toFloat64(v); ok {
rssi = &f
}
}
pktData := &PacketData{
Timestamp: now,
ObserverID: "companion",
ObserverName: "L1 Pro (BLE)",
SNR: snr,
RSSI: rssi,
Hash: hash,
RouteType: 1, // FLOOD
PayloadType: 5, // GRP_TXT
PathJSON: "[]",
DecodedJSON: string(decodedJSON),
}
if _, err := store.InsertTransmission(pktData); err != nil {
log.Printf("MQTT [%s] channel insert error: %v", tag, err)
}
// Upsert sender as a companion node
if sender != "" {
senderKey := "sender-" + strings.ToLower(sender)
if err := store.UpsertNode(senderKey, sender, "companion", nil, nil, now); err != nil {
log.Printf("MQTT [%s] sender node upsert error: %v", tag, err)
}
}
log.Printf("MQTT [%s] channel message: ch%s from %s", tag, channelIdx, firstNonEmpty(sender, "unknown"))
return
}
// Format 2b: Companion bridge direct message (meshcore/message/direct/<id>)
if strings.HasPrefix(topic, "meshcore/message/direct/") {
text, _ := msg["text"].(string)
if text == "" {
return
}
sender := ""
if idx := strings.Index(text, ": "); idx > 0 && idx < 50 {
sender = text[:idx]
}
dm := map[string]interface{}{
"type": "DM",
"text": text,
"sender": sender,
}
if st, ok := msg["sender_timestamp"]; ok {
dm["sender_timestamp"] = st
}
decodedJSON, _ := json.Marshal(dm)
now := time.Now().UTC().Format(time.RFC3339)
hashInput := fmt.Sprintf("dm:%s:%s", text, now)
h := sha256.Sum256([]byte(hashInput))
hash := hex.EncodeToString(h[:])[:16]
var snr, rssi *float64
if v, ok := msg["SNR"]; ok {
if f, ok := toFloat64(v); ok {
snr = &f
}
} else if v, ok := msg["snr"]; ok {
if f, ok := toFloat64(v); ok {
snr = &f
}
}
if v, ok := msg["RSSI"]; ok {
if f, ok := toFloat64(v); ok {
rssi = &f
}
} else if v, ok := msg["rssi"]; ok {
if f, ok := toFloat64(v); ok {
rssi = &f
}
}
pktData := &PacketData{
Timestamp: now,
ObserverID: "companion",
ObserverName: "L1 Pro (BLE)",
SNR: snr,
RSSI: rssi,
Hash: hash,
RouteType: 1, // FLOOD
PayloadType: 2, // TXT_MSG
PathJSON: "[]",
DecodedJSON: string(decodedJSON),
}
if _, err := store.InsertTransmission(pktData); err != nil {
log.Printf("MQTT [%s] DM insert error: %v", tag, err)
}
log.Printf("MQTT [%s] direct message from %s", tag, firstNonEmpty(sender, "unknown"))
return
}
}
func toFloat64(v interface{}) (float64, bool) {
switch n := v.(type) {
case float64:
return n, true
case float32:
return float64(n), true
case int:
return float64(n), true
case int64:
return float64(n), true
case json.Number:
f, err := n.Float64()
return f, err == nil
default:
return 0, false
}
}
func firstNonEmpty(vals ...string) string {
for _, v := range vals {
if v != "" {
return v
}
}
return ""
}
// loadChannelKeys loads channel decryption keys from config and/or a JSON file.
// Priority: CHANNEL_KEYS_PATH env var > cfg.ChannelKeysPath > channel-rainbow.json next to config.
func loadChannelKeys(cfg *Config, configPath string) map[string]string {
keys := make(map[string]string)
// Determine file path for rainbow keys
keysPath := os.Getenv("CHANNEL_KEYS_PATH")
if keysPath == "" {
keysPath = cfg.ChannelKeysPath
}
if keysPath == "" {
// Default: look for channel-rainbow.json next to config file
keysPath = filepath.Join(filepath.Dir(configPath), "channel-rainbow.json")
}
if data, err := os.ReadFile(keysPath); err == nil {
var fileKeys map[string]string
if err := json.Unmarshal(data, &fileKeys); err == nil {
for k, v := range fileKeys {
keys[k] = v
}
log.Printf("Loaded %d channel keys from %s", len(fileKeys), keysPath)
} else {
log.Printf("Warning: failed to parse channel keys file %s: %v", keysPath, err)
}
}
// Merge inline config keys (override file keys)
for k, v := range cfg.ChannelKeys {
keys[k] = v
}
return keys
}
// Version info (set via ldflags)
var version = "dev"
func init() {
if len(os.Args) > 1 && os.Args[1] == "--version" {
fmt.Println("meshcore-ingestor", version)
os.Exit(0)
}
}
package main
import (
"crypto/sha256"
"crypto/tls"
"encoding/hex"
"encoding/json"
"flag"
"fmt"
"log"
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
func main() {
configPath := flag.String("config", "config.json", "path to config file")
flag.Parse()
log.SetFlags(log.LstdFlags | log.Lmsgprefix)
log.SetPrefix("[ingestor] ")
cfg, err := LoadConfig(*configPath)
if err != nil {
log.Fatalf("config: %v", err)
}
sources := cfg.ResolvedSources()
if len(sources) == 0 {
log.Fatal("no MQTT sources configured — set mqttSources in config or MQTT_BROKER env var")
}
store, err := OpenStore(cfg.DBPath)
if err != nil {
log.Fatalf("db: %v", err)
}
defer store.Close()
log.Printf("SQLite opened: %s", cfg.DBPath)
// Node retention: move stale nodes to inactive_nodes on startup
nodeDays := cfg.NodeDaysOrDefault()
store.MoveStaleNodes(nodeDays)
// Daily ticker for node retention
retentionTicker := time.NewTicker(1 * time.Hour)
go func() {
for range retentionTicker.C {
store.MoveStaleNodes(nodeDays)
}
}()
// Periodic stats logging (every 5 minutes)
statsTicker := time.NewTicker(5 * time.Minute)
go func() {
for range statsTicker.C {
store.LogStats()
}
}()
channelKeys := loadChannelKeys(cfg, *configPath)
if len(channelKeys) > 0 {
log.Printf("Loaded %d channel keys for GRP_TXT decryption", len(channelKeys))
} else {
log.Printf("No channel keys loaded — GRP_TXT packets will not be decrypted")
}
// Connect to each MQTT source
var clients []mqtt.Client
for _, source := range sources {
tag := source.Name
if tag == "" {
tag = source.Broker
}
opts := mqtt.NewClientOptions().
AddBroker(source.Broker).
SetAutoReconnect(true).
SetConnectRetry(true).
SetOrderMatters(true)
if source.Username != "" {
opts.SetUsername(source.Username)
}
if source.Password != "" {
opts.SetPassword(source.Password)
}
if source.RejectUnauthorized != nil && !*source.RejectUnauthorized {
opts.SetTLSConfig(&tls.Config{InsecureSkipVerify: true})
} else if strings.HasPrefix(source.Broker, "ssl://") {
opts.SetTLSConfig(&tls.Config{})
}
opts.SetOnConnectHandler(func(c mqtt.Client) {
log.Printf("MQTT [%s] connected to %s", tag, source.Broker)
topics := source.Topics
if len(topics) == 0 {
topics = []string{"meshcore/#"}
}
for _, t := range topics {
token := c.Subscribe(t, 0, nil)
token.Wait()
if token.Error() != nil {
log.Printf("MQTT [%s] subscribe error for %s: %v", tag, t, token.Error())
} else {
log.Printf("MQTT [%s] subscribed to %s", tag, t)
}
}
})
opts.SetConnectionLostHandler(func(c mqtt.Client, err error) {
log.Printf("MQTT [%s] disconnected: %v", tag, err)
})
// Capture source for closure
src := source
opts.SetDefaultPublishHandler(func(c mqtt.Client, m mqtt.Message) {
handleMessage(store, tag, src, m, channelKeys)
})
client := mqtt.NewClient(opts)
token := client.Connect()
token.Wait()
if token.Error() != nil {
log.Printf("MQTT [%s] connection failed (non-fatal): %v", tag, token.Error())
continue
}
clients = append(clients, client)
}
if len(clients) == 0 {
log.Fatal("no MQTT connections established")
}
log.Printf("Running — %d MQTT source(s) connected", len(clients))
// Wait for shutdown signal
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
<-sig
log.Println("Shutting down...")
retentionTicker.Stop()
statsTicker.Stop()
store.LogStats() // final stats on shutdown
for _, c := range clients {
c.Disconnect(1000)
}
log.Println("Done.")
}
func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message, channelKeys map[string]string) {
defer func() {
if r := recover(); r != nil {
log.Printf("MQTT [%s] panic in handler: %v", tag, r)
}
}()
topic := m.Topic()
parts := strings.Split(topic, "/")
// IATA filter
if len(source.IATAFilter) > 0 && len(parts) > 1 {
region := parts[1]
matched := false
for _, f := range source.IATAFilter {
if f == region {
matched = true
break
}
}
if !matched {
return
}
}
var msg map[string]interface{}
if err := json.Unmarshal(m.Payload(), &msg); err != nil {
return
}
// Skip status/connection topics
if topic == "meshcore/status" || topic == "meshcore/events/connection" {
return
}
// Status topic: meshcore/<region>/<observer_id>/status
if len(parts) >= 4 && parts[3] == "status" {
observerID := parts[2]
name, _ := msg["origin"].(string)
iata := parts[1]
if err := store.UpsertObserver(observerID, name, iata); err != nil {
log.Printf("MQTT [%s] observer status error: %v", tag, err)
}
log.Printf("MQTT [%s] status: %s (%s)", tag, firstNonEmpty(name, observerID), iata)
return
}
// Format 1: Raw packet (meshcoretomqtt / Cisien format)
rawHex, _ := msg["raw"].(string)
if rawHex != "" {
decoded, err := DecodePacket(rawHex, channelKeys)
if err != nil {
log.Printf("MQTT [%s] decode error: %v", tag, err)
return
}
observerID := ""
region := ""
if len(parts) > 2 {
observerID = parts[2]
}
if len(parts) > 1 {
region = parts[1]
}
mqttMsg := &MQTTPacketMessage{Raw: rawHex}
if v, ok := msg["SNR"]; ok {
if f, ok := toFloat64(v); ok {
mqttMsg.SNR = &f
}
}
if v, ok := msg["RSSI"]; ok {
if f, ok := toFloat64(v); ok {
mqttMsg.RSSI = &f
}
}
if v, ok := msg["origin"].(string); ok {
mqttMsg.Origin = v
}
pktData := BuildPacketData(mqttMsg, decoded, observerID, region)
isNew, err := store.InsertTransmission(pktData)
if err != nil {
log.Printf("MQTT [%s] db insert error: %v", tag, err)
}
// Process ADVERT → upsert node
if decoded.Header.PayloadTypeName == "ADVERT" && decoded.Payload.PubKey != "" {
ok, reason := ValidateAdvert(&decoded.Payload)
if ok {
role := advertRole(decoded.Payload.Flags)
if err := store.UpsertNode(decoded.Payload.PubKey, decoded.Payload.Name, role, decoded.Payload.Lat, decoded.Payload.Lon, pktData.Timestamp); err != nil {
log.Printf("MQTT [%s] node upsert error: %v", tag, err)
}
if isNew {
if err := store.IncrementAdvertCount(decoded.Payload.PubKey); err != nil {
log.Printf("MQTT [%s] advert count error: %v", tag, err)
}
}
} else {
log.Printf("MQTT [%s] skipping corrupted ADVERT: %s", tag, reason)
}
}
// Upsert observer
if observerID != "" {
origin, _ := msg["origin"].(string)
if err := store.UpsertObserver(observerID, origin, region); err != nil {
log.Printf("MQTT [%s] observer upsert error: %v", tag, err)
}
}
return
}
// Format 2: Companion bridge channel message (meshcore/message/channel/<n>)
if strings.HasPrefix(topic, "meshcore/message/channel/") {
text, _ := msg["text"].(string)
if text == "" {
return
}
channelIdx := ""
if len(parts) >= 4 {
channelIdx = parts[3]
}
if ci, ok := msg["channel_idx"]; ok {
channelIdx = fmt.Sprintf("%v", ci)
}
// Extract sender from "Name: message" format
sender := ""
if idx := strings.Index(text, ": "); idx > 0 && idx < 50 {
sender = text[:idx]
}
channelName := fmt.Sprintf("ch%s", channelIdx)
// Build decoded JSON matching Node.js CHAN format
channelMsg := map[string]interface{}{
"type": "CHAN",
"channel": channelName,
"text": text,
"sender": sender,
}
if st, ok := msg["sender_timestamp"]; ok {
channelMsg["sender_timestamp"] = st
}
decodedJSON, _ := json.Marshal(channelMsg)
now := time.Now().UTC().Format(time.RFC3339)
hashInput := fmt.Sprintf("ch:%s:%s:%s", channelIdx, text, now)
h := sha256.Sum256([]byte(hashInput))
hash := hex.EncodeToString(h[:])[:16]
var snr, rssi *float64
if v, ok := msg["SNR"]; ok {
if f, ok := toFloat64(v); ok {
snr = &f
}
} else if v, ok := msg["snr"]; ok {
if f, ok := toFloat64(v); ok {
snr = &f
}
}
if v, ok := msg["RSSI"]; ok {
if f, ok := toFloat64(v); ok {
rssi = &f
}
} else if v, ok := msg["rssi"]; ok {
if f, ok := toFloat64(v); ok {
rssi = &f
}
}
pktData := &PacketData{
Timestamp: now,
ObserverID: "companion",
ObserverName: "L1 Pro (BLE)",
SNR: snr,
RSSI: rssi,
Hash: hash,
RouteType: 1, // FLOOD
PayloadType: 5, // GRP_TXT
PathJSON: "[]",
DecodedJSON: string(decodedJSON),
}
if _, err := store.InsertTransmission(pktData); err != nil {
log.Printf("MQTT [%s] channel insert error: %v", tag, err)
}
// Upsert sender as a companion node
if sender != "" {
senderKey := "sender-" + strings.ToLower(sender)
if err := store.UpsertNode(senderKey, sender, "companion", nil, nil, now); err != nil {
log.Printf("MQTT [%s] sender node upsert error: %v", tag, err)
}
}
log.Printf("MQTT [%s] channel message: ch%s from %s", tag, channelIdx, firstNonEmpty(sender, "unknown"))
return
}
// Format 2b: Companion bridge direct message (meshcore/message/direct/<id>)
if strings.HasPrefix(topic, "meshcore/message/direct/") {
text, _ := msg["text"].(string)
if text == "" {
return
}
sender := ""
if idx := strings.Index(text, ": "); idx > 0 && idx < 50 {
sender = text[:idx]
}
dm := map[string]interface{}{
"type": "DM",
"text": text,
"sender": sender,
}
if st, ok := msg["sender_timestamp"]; ok {
dm["sender_timestamp"] = st
}
decodedJSON, _ := json.Marshal(dm)
now := time.Now().UTC().Format(time.RFC3339)
hashInput := fmt.Sprintf("dm:%s:%s", text, now)
h := sha256.Sum256([]byte(hashInput))
hash := hex.EncodeToString(h[:])[:16]
var snr, rssi *float64
if v, ok := msg["SNR"]; ok {
if f, ok := toFloat64(v); ok {
snr = &f
}
} else if v, ok := msg["snr"]; ok {
if f, ok := toFloat64(v); ok {
snr = &f
}
}
if v, ok := msg["RSSI"]; ok {
if f, ok := toFloat64(v); ok {
rssi = &f
}
} else if v, ok := msg["rssi"]; ok {
if f, ok := toFloat64(v); ok {
rssi = &f
}
}
pktData := &PacketData{
Timestamp: now,
ObserverID: "companion",
ObserverName: "L1 Pro (BLE)",
SNR: snr,
RSSI: rssi,
Hash: hash,
RouteType: 1, // FLOOD
PayloadType: 2, // TXT_MSG
PathJSON: "[]",
DecodedJSON: string(decodedJSON),
}
if _, err := store.InsertTransmission(pktData); err != nil {
log.Printf("MQTT [%s] DM insert error: %v", tag, err)
}
log.Printf("MQTT [%s] direct message from %s", tag, firstNonEmpty(sender, "unknown"))
return
}
}
func toFloat64(v interface{}) (float64, bool) {
switch n := v.(type) {
case float64:
return n, true
case float32:
return float64(n), true
case int:
return float64(n), true
case int64:
return float64(n), true
case json.Number:
f, err := n.Float64()
return f, err == nil
default:
return 0, false
}
}
func firstNonEmpty(vals ...string) string {
for _, v := range vals {
if v != "" {
return v
}
}
return ""
}
// loadChannelKeys loads channel decryption keys from config and/or a JSON file.
// Priority: CHANNEL_KEYS_PATH env var > cfg.ChannelKeysPath > channel-rainbow.json next to config.
func loadChannelKeys(cfg *Config, configPath string) map[string]string {
keys := make(map[string]string)
// Determine file path for rainbow keys
keysPath := os.Getenv("CHANNEL_KEYS_PATH")
if keysPath == "" {
keysPath = cfg.ChannelKeysPath
}
if keysPath == "" {
// Default: look for channel-rainbow.json next to config file
keysPath = filepath.Join(filepath.Dir(configPath), "channel-rainbow.json")
}
if data, err := os.ReadFile(keysPath); err == nil {
var fileKeys map[string]string
if err := json.Unmarshal(data, &fileKeys); err == nil {
for k, v := range fileKeys {
keys[k] = v
}
log.Printf("Loaded %d channel keys from %s", len(fileKeys), keysPath)
} else {
log.Printf("Warning: failed to parse channel keys file %s: %v", keysPath, err)
}
}
// Merge inline config keys (override file keys)
for k, v := range cfg.ChannelKeys {
keys[k] = v
}
return keys
}
// Version info (set via ldflags)
var version = "dev"
func init() {
if len(os.Args) > 1 && os.Args[1] == "--version" {
fmt.Println("meshcore-ingestor", version)
os.Exit(0)
}
}

View File

@@ -177,6 +177,15 @@
tbl.id = tbl.id || `analytics-tbl-${tab}-${i}`;
if (typeof makeColumnsResizable === 'function') makeColumnsResizable('#' + tbl.id, `meshcore-analytics-${tab}-${i}-col-widths`);
});
// #206 — Wrap analytics tables in scroll containers on mobile
el.querySelectorAll('.analytics-table').forEach(tbl => {
if (!tbl.parentElement.classList.contains('analytics-table-scroll')) {
const wrapper = document.createElement('div');
wrapper.className = 'analytics-table-scroll';
tbl.parentElement.insertBefore(wrapper, tbl);
wrapper.appendChild(tbl);
}
});
});
// Deep-link scroll to section within tab
const sectionId = new URLSearchParams((location.hash.split('?')[1] || '')).get('section');
@@ -395,7 +404,7 @@
function renderSNRByType(snrByType) {
if (!snrByType.length) return '<div class="text-muted">No data</div>';
let html = '<table class="analytics-table"><thead><tr><th>Type</th><th>Packets</th><th>Avg SNR</th><th>Min</th><th>Max</th><th>Distribution</th></tr></thead><tbody>';
let html = '<table class="analytics-table"><thead><tr><th scope="col">Type</th><th scope="col">Packets</th><th scope="col">Avg SNR</th><th scope="col">Min</th><th scope="col">Max</th><th scope="col">Distribution</th></tr></thead><tbody>';
snrByType.forEach(t => {
const barPct = Math.max(((t.avg - (-12)) / 27) * 100, 2);
const color = t.avg > 6 ? statusGreen() : t.avg > 0 ? statusYellow() : statusRed();
@@ -535,7 +544,7 @@
function renderPairTable(pairs) {
if (!pairs.length) return '<div class="text-muted">Not enough multi-hop data</div>';
let html = '<table class="analytics-table"><thead><tr><th>Node A</th><th>Node B</th><th>Co-appearances</th></tr></thead><tbody>';
let html = '<table class="analytics-table"><thead><tr><th scope="col">Node A</th><th scope="col">Node B</th><th scope="col">Co-appearances</th></tr></thead><tbody>';
pairs.slice(0, 12).forEach(p => {
html += `<tr>
<td>${p.nameA ? `<a href="#/nodes/${encodeURIComponent(p.pubkeyA)}" class="analytics-link">${esc(p.nameA)}</a>` : `<span class="mono">${p.hopA}</span>`}</td>
@@ -598,7 +607,7 @@
function renderCrossObserver(nodes) {
if (!nodes.length) return '<div class="text-muted">No nodes seen by multiple observers</div>';
let html = `<table class="analytics-table">
<thead><tr><th>Node</th><th>Observers</th><th>Hop Distances</th></tr></thead><tbody>`;
<thead><tr><th scope="col">Node</th><th scope="col">Observers</th><th scope="col">Hop Distances</th></tr></thead><tbody>`;
nodes.forEach(n => {
const name = n.name
? `<a href="#/nodes/${encodeURIComponent(n.pubkey)}" class="analytics-link">${esc(n.name)}</a>`
@@ -719,7 +728,7 @@
var ths = '';
for (var i = 0; i < cols.length; i++) {
var c = cols[i];
ths += '<th class="sortable' + (c.key === activeCol ? ' sort-active' : '') + '" data-sort-col="' + c.key + '">' +
ths += '<th scope="col" class="sortable' + (c.key === activeCol ? ' sort-active' : '') + '" data-sort-col="' + c.key + '">' +
c.label + channelSortArrow(c.key, activeCol, dir) + '</th>';
}
return '<thead><tr>' + ths + '</tr></thead>';
@@ -880,7 +889,7 @@
<p class="text-muted">Nodes advertising with 2+ byte hash paths</p>
${data.multiByteNodes.length ? `
<table class="analytics-table">
<thead><tr><th>Node</th><th>Hash Size</th><th>Adverts</th><th>Last Seen</th></tr></thead>
<thead><tr><th scope="col">Node</th><th scope="col">Hash Size</th><th scope="col">Adverts</th><th scope="col">Last Seen</th></tr></thead>
<tbody>
${data.multiByteNodes.map(n => `<tr class="clickable-row" data-action="navigate" data-value="#/nodes/${n.pubkey ? encodeURIComponent(n.pubkey) : ''}" tabindex="0" role="row">
<td><strong>${esc(n.name)}</strong></td>
@@ -896,7 +905,7 @@
<div class="analytics-card flex-1">
<h3>Top Path Hops</h3>
<table class="analytics-table">
<thead><tr><th>Hop</th><th>Node</th><th>Bytes</th><th>Appearances</th></tr></thead>
<thead><tr><th scope="col">Hop</th><th scope="col">Node</th><th scope="col">Bytes</th><th scope="col">Appearances</th></tr></thead>
<tbody>
${data.topHops.map(h => {
const link = h.pubkey ? `#/nodes/${encodeURIComponent(h.pubkey)}` : `#/packets?search=${h.hex}`;
@@ -952,7 +961,7 @@
ihEl.innerHTML = '<div class="text-muted" style="padding:4px">✅ No inconsistencies detected — all nodes are reporting consistent hash sizes.</div>';
} else {
ihEl.innerHTML = `<table class="analytics-table" style="background:var(--card-bg);border:1px solid var(--border);border-radius:8px;overflow:hidden">
<thead><tr><th>Node</th><th>Role</th><th>Current Hash</th><th>Sizes Seen</th></tr></thead>
<thead><tr><th scope="col">Node</th><th scope="col">Role</th><th scope="col">Current Hash</th><th scope="col">Sizes Seen</th></tr></thead>
<tbody>${inconsistent.map((n, i) => {
const roleColor = window.ROLE_COLORS?.[n.role] || '#6b7280';
const prefix = n.hash_size ? n.public_key.slice(0, n.hash_size * 2).toUpperCase() : '?';
@@ -1123,7 +1132,7 @@
collisions.sort((a, b) => classOrder[a.classification] - classOrder[b.classification] || b.count - a.count);
el.innerHTML = `<table class="analytics-table">
<thead><tr><th>Hop</th><th>Appearances</th><th>Max Distance</th><th>Assessment</th><th>Colliding Nodes</th></tr></thead>
<thead><tr><th scope="col">Hop</th><th scope="col">Appearances</th><th scope="col">Max Distance</th><th scope="col">Assessment</th><th scope="col">Colliding Nodes</th></tr></thead>
<tbody>${collisions.map(c => {
let badge, tooltip;
if (c.classification === 'local') {
@@ -1179,7 +1188,7 @@
return `<h4>${title}</h4>
<p class="text-muted" style="margin:4px 0 8px">From ${data.totalPaths.toLocaleString()} paths with 2+ hops</p>
<table class="analytics-table"><thead><tr>
<th>#</th><th>Route</th><th>Occurrences</th><th>% of paths</th><th>Frequency</th>
<th scope="col">#</th><th scope="col">Route</th><th scope="col">Occurrences</th><th scope="col">% of paths</th><th scope="col">Frequency</th>
</tr></thead><tbody>
${data.subpaths.map((s, i) => {
const barW = Math.max(2, Math.round(s.count / maxCount * 100));
@@ -1434,7 +1443,7 @@
${myKeys.size ? `<h3>⭐ My Claimed Nodes</h3>
<table class="analytics-table" style="margin-bottom:24px">
<thead><tr><th>Node</th><th>Role</th><th>Packets</th><th>Avg SNR</th><th>Observers</th><th>Last Heard</th></tr></thead>
<thead><tr><th scope="col">Node</th><th scope="col">Role</th><th scope="col">Packets</th><th scope="col">Avg SNR</th><th scope="col">Observers</th><th scope="col">Last Heard</th></tr></thead>
<tbody>
${enriched.filter(n => myKeys.has(n.public_key)).map(n => {
const s = n.health.stats;
@@ -1452,7 +1461,7 @@
<h3>🏆 Most Active Nodes</h3>
<table class="analytics-table" style="margin-bottom:24px">
<thead><tr><th>#</th><th>Node</th><th>Role</th><th>Total Packets</th><th>Packets Today</th><th>Analytics</th></tr></thead>
<thead><tr><th scope="col">#</th><th scope="col">Node</th><th scope="col">Role</th><th scope="col">Total Packets</th><th scope="col">Packets Today</th><th scope="col">Analytics</th></tr></thead>
<tbody>
${byPackets.slice(0, 15).map((n, i) => `<tr>
<td>${i + 1}</td>
@@ -1467,7 +1476,7 @@
<h3>📶 Best Signal Quality</h3>
<table class="analytics-table" style="margin-bottom:24px">
<thead><tr><th>#</th><th>Node</th><th>Role</th><th>Avg SNR</th><th>Observers</th><th>Analytics</th></tr></thead>
<thead><tr><th scope="col">#</th><th scope="col">Node</th><th scope="col">Role</th><th scope="col">Avg SNR</th><th scope="col">Observers</th><th scope="col">Analytics</th></tr></thead>
<tbody>
${bySnr.slice(0, 15).map((n, i) => `<tr>
<td>${i + 1}</td>
@@ -1482,7 +1491,7 @@
<h3>👀 Most Observed Nodes</h3>
<table class="analytics-table" style="margin-bottom:24px">
<thead><tr><th>#</th><th>Node</th><th>Role</th><th>Observers</th><th>Avg SNR</th><th>Analytics</th></tr></thead>
<thead><tr><th scope="col">#</th><th scope="col">Node</th><th scope="col">Role</th><th scope="col">Observers</th><th scope="col">Avg SNR</th><th scope="col">Analytics</th></tr></thead>
<tbody>
${byObservers.slice(0, 15).map((n, i) => `<tr>
<td>${i + 1}</td>
@@ -1497,7 +1506,7 @@
<h3>⏰ Recently Active</h3>
<table class="analytics-table" style="margin-bottom:24px">
<thead><tr><th>Node</th><th>Role</th><th>Last Heard</th><th>Packets Today</th><th>Analytics</th></tr></thead>
<thead><tr><th scope="col">Node</th><th scope="col">Role</th><th scope="col">Last Heard</th><th scope="col">Packets Today</th><th scope="col">Analytics</th></tr></thead>
<tbody>
${byRecent.slice(0, 15).map(n => `<tr>
<td>${nodeLink(n)}${claimedBadge(n)}</td>
@@ -1529,7 +1538,7 @@
// Category stats
const cats = data.catStats;
html += `<div class="analytics-section"><h3>Distance by Link Type</h3><table class="data-table"><thead><tr><th>Type</th><th>Count</th><th>Avg (km)</th><th>Median (km)</th><th>Min (km)</th><th>Max (km)</th></tr></thead><tbody>`;
html += `<div class="analytics-section"><h3>Distance by Link Type</h3><table class="data-table"><thead><tr><th scope="col">Type</th><th scope="col">Count</th><th scope="col">Avg (km)</th><th scope="col">Median (km)</th><th scope="col">Min (km)</th><th scope="col">Max (km)</th></tr></thead><tbody>`;
for (const [cat, st] of Object.entries(cats)) {
if (!st.count) continue;
html += `<tr><td><strong>${esc(cat)}</strong></td><td>${st.count.toLocaleString()}</td><td>${st.avg}</td><td>${st.median}</td><td>${st.min}</td><td>${st.max}</td></tr>`;
@@ -1549,7 +1558,7 @@
}
// Top hops leaderboard
html += `<div class="analytics-section"><h3>🏆 Top 20 Longest Hops</h3><table class="data-table"><thead><tr><th>#</th><th>From</th><th>To</th><th>Distance (km)</th><th>Type</th><th>SNR</th><th>Packet</th><th></th></tr></thead><tbody>`;
html += `<div class="analytics-section"><h3>🏆 Top 20 Longest Hops</h3><table class="data-table"><thead><tr><th scope="col">#</th><th scope="col">From</th><th scope="col">To</th><th scope="col">Distance (km)</th><th scope="col">Type</th><th scope="col">SNR</th><th scope="col">Packet</th><th scope="col"></th></tr></thead><tbody>`;
const top20 = data.topHops.slice(0, 20);
top20.forEach((h, i) => {
const fromLink = h.fromPk ? `<a href="#/nodes/${encodeURIComponent(h.fromPk)}" class="analytics-link">${esc(h.fromName)}</a>` : esc(h.fromName || '?');
@@ -1563,7 +1572,7 @@
// Top paths
if (data.topPaths.length) {
html += `<div class="analytics-section"><h3>🛤️ Top 10 Longest Multi-Hop Paths</h3><table class="data-table"><thead><tr><th>#</th><th>Total Distance (km)</th><th>Hops</th><th>Route</th><th>Packet</th><th></th></tr></thead><tbody>`;
html += `<div class="analytics-section"><h3>🛤️ Top 10 Longest Multi-Hop Paths</h3><table class="data-table"><thead><tr><th scope="col">#</th><th scope="col">Total Distance (km)</th><th scope="col">Hops</th><th scope="col">Route</th><th scope="col">Packet</th><th scope="col"></th></tr></thead><tbody>`;
data.topPaths.slice(0, 10).forEach((p, i) => {
const route = p.hops.map(h => esc(h.fromName)).concat(esc(p.hops[p.hops.length-1].toName)).join(' → ');
const pktLink = p.hash ? `<a href="#/packet/${encodeURIComponent(p.hash)}" class="analytics-link mono" style="font-size:0.85em">${esc(p.hash.slice(0, 12))}…</a>` : '—';

View File

@@ -523,21 +523,21 @@ window.addEventListener('DOMContentLoaded', () => {
const pktList = packets.packets || packets;
if (Array.isArray(pktList)) {
for (const p of pktList.slice(0, 5)) {
html += `<div class="search-result-item" onclick="location.hash='#/packets/${p.packet_hash || p.hash || p.id}';document.getElementById('searchOverlay').classList.add('hidden')">
html += `<div class="search-result-item" tabindex="0" role="option" data-href="#/packets/${p.packet_hash || p.hash || p.id}">
<span class="search-result-type">Packet</span>${truncate(p.packet_hash || '', 16)}${payloadTypeName(p.payload_type)}</div>`;
}
}
const nodeList = Array.isArray(nodes) ? nodes : (nodes.nodes || []);
for (const n of nodeList.slice(0, 5)) {
if (n.name && n.name.toLowerCase().includes(q.toLowerCase())) {
html += `<div class="search-result-item" onclick="location.hash='#/nodes/${n.public_key}';document.getElementById('searchOverlay').classList.add('hidden')">
html += `<div class="search-result-item" tabindex="0" role="option" data-href="#/nodes/${n.public_key}">
<span class="search-result-type">Node</span>${n.name}${truncate(n.public_key || '', 16)}</div>`;
}
}
const chList = Array.isArray(channels) ? channels : [];
for (const c of chList) {
if (c.name && c.name.toLowerCase().includes(q.toLowerCase())) {
html += `<div class="search-result-item" onclick="location.hash='#/channels/${c.channel_hash}';document.getElementById('searchOverlay').classList.add('hidden')">
html += `<div class="search-result-item" tabindex="0" role="option" data-href="#/channels/${c.channel_hash}">
<span class="search-result-type">Channel</span>${c.name}</div>`;
}
}
@@ -547,6 +547,40 @@ window.addEventListener('DOMContentLoaded', () => {
}, 300);
});
// #208 — Search results keyboard: click, Enter/Space, arrow-key navigation
function activateSearchItem(item) {
if (!item || !item.dataset.href) return;
location.hash = item.dataset.href;
searchOverlay.classList.add('hidden');
}
searchResults.addEventListener('click', (e) => {
activateSearchItem(e.target.closest('.search-result-item'));
});
searchResults.addEventListener('keydown', (e) => {
const item = e.target.closest('.search-result-item');
if (!item) return;
if (e.key === 'Enter' || e.key === ' ') {
e.preventDefault();
activateSearchItem(item);
} else if (e.key === 'ArrowDown') {
e.preventDefault();
const next = item.nextElementSibling;
if (next && next.classList.contains('search-result-item')) next.focus();
} else if (e.key === 'ArrowUp') {
e.preventDefault();
const prev = item.previousElementSibling;
if (prev && prev.classList.contains('search-result-item')) prev.focus();
else searchInput.focus();
}
});
searchInput.addEventListener('keydown', (e) => {
if (e.key === 'ArrowDown') {
e.preventDefault();
const first = searchResults.querySelector('.search-result-item');
if (first) first.focus();
}
});
// --- Login ---
// (removed — no auth yet)

View File

@@ -263,7 +263,7 @@
<div class="alab-section">
<h3>🎹 Note Sequence</h3>
<table class="alab-note-table">
<tr><th></th><th>#</th><th>Payload Index</th><th>Byte</th><th>→ MIDI</th><th>→ Freq</th><th>Duration (why)</th><th>Gap (why)</th></tr>
<tr><th scope="col"></th><th scope="col">#</th><th scope="col">Payload Index</th><th scope="col">Byte</th><th scope="col">→ MIDI</th><th scope="col">→ Freq</th><th scope="col">Duration (why)</th><th scope="col">Gap (why)</th></tr>
${m.notes.map((n, i) => {
const durWhy = `byte ${n.byte} → map(0...255 → 50...400ms) × tempo`;
const gapWhy = i < m.notes.length - 1

View File

@@ -57,6 +57,15 @@ if (typeof window !== 'undefined') window.comparePacketSets = comparePacketSets;
'<div id="compareContent"></div>' +
'</div>';
// #209 — Keyboard accessibility for compare table rows
app.addEventListener('keydown', function (e) {
var row = e.target.closest('tr[data-action="navigate"]');
if (!row) return;
if (e.key !== 'Enter' && e.key !== ' ') return;
e.preventDefault();
location.hash = row.dataset.value;
});
loadObservers();
}
@@ -316,9 +325,9 @@ if (typeof window !== 'undefined') window.comparePacketSets = comparePacketSets;
el.innerHTML =
(hashes.length > displayLimit ? '<div class="text-muted" style="margin-bottom:8px">Showing first ' + displayLimit + ' of ' + hashes.length.toLocaleString() + ' packets.</div>' : '') +
'<table class="data-table compare-table">' +
'<div class="analytics-table-scroll"><table class="data-table compare-table">' +
'<thead><tr>' +
'<th>Hash</th><th>Time</th><th>Type</th><th>Observer</th>' +
'<th scope="col">Hash</th><th scope="col">Time</th><th scope="col">Type</th><th scope="col">Observer</th>' +
'</tr></thead>' +
'<tbody>' + displayed.map(function (h) {
var p = mapA.get(h) || mapB.get(h);
@@ -332,7 +341,7 @@ if (typeof window !== 'undefined') window.comparePacketSets = comparePacketSets;
} else {
obsLabel = nameB;
}
return '<tr style="cursor:pointer" onclick="location.hash=\'#/packets/' + escapeHtml(h) + '\'">' +
return '<tr style="cursor:pointer" tabindex="0" role="row" data-action="navigate" data-value="#/packets/' + escapeHtml(h) + '" onclick="location.hash=\'#/packets/' + escapeHtml(h) + '\'">' +
'<td class="mono" style="font-size:0.85em">' + escapeHtml(h.substring(0, 12)) + '</td>' +
'<td>' + timeAgo(p.timestamp || p.first_seen) + '</td>' +
'<td><span class="payload-badge badge-' + payloadTypeColor(p.payload_type) + '">' + escapeHtml(typeName) + '</span></td>' +
@@ -340,7 +349,7 @@ if (typeof window !== 'undefined') window.comparePacketSets = comparePacketSets;
'</tr>';
}).join('') +
'</tbody>' +
'</table>';
'</table></div>';
}
registerPage('compare', { init: init, destroy: destroy });

View File

@@ -642,21 +642,21 @@
var b = state.branding;
var logoPreview = b.logoUrl ? '<img class="cust-preview-img" src="' + escAttr(b.logoUrl) + '" alt="Logo preview" onerror="this.style.display=\'none\'">' : '';
return '<div class="cust-panel' + (activeTab === 'branding' ? ' active' : '') + '" data-panel="branding">' +
'<div class="cust-field"><label>Site Name</label><input type="text" data-key="branding.siteName" value="' + escAttr(b.siteName) + '"></div>' +
'<div class="cust-field"><label>Tagline</label><input type="text" data-key="branding.tagline" value="' + escAttr(b.tagline) + '"></div>' +
'<div class="cust-field"><label>Logo URL</label><input type="text" data-key="branding.logoUrl" value="' + escAttr(b.logoUrl) + '" placeholder="https://...">' + logoPreview + '</div>' +
'<div class="cust-field"><label>Favicon URL</label><input type="text" data-key="branding.faviconUrl" value="' + escAttr(b.faviconUrl) + '" placeholder="https://..."></div>' +
'<div class="cust-field"><label for="cust-siteName">Site Name</label><input type="text" id="cust-siteName" data-key="branding.siteName" value="' + escAttr(b.siteName) + '"></div>' +
'<div class="cust-field"><label for="cust-tagline">Tagline</label><input type="text" id="cust-tagline" data-key="branding.tagline" value="' + escAttr(b.tagline) + '"></div>' +
'<div class="cust-field"><label for="cust-logoUrl">Logo URL</label><input type="text" id="cust-logoUrl" data-key="branding.logoUrl" value="' + escAttr(b.logoUrl) + '" placeholder="https://...">' + logoPreview + '</div>' +
'<div class="cust-field"><label for="cust-faviconUrl">Favicon URL</label><input type="text" id="cust-faviconUrl" data-key="branding.faviconUrl" value="' + escAttr(b.faviconUrl) + '" placeholder="https://..."></div>' +
'</div>';
}
function renderColorRow(key, val, def, dataAttr) {
var isFont = key === 'font' || key === 'mono';
var inputHtml = isFont
? '<input type="text" data-' + dataAttr + '="' + key + '" value="' + escAttr(val) + '" style="width:160px;font-size:11px;font-family:var(--mono);padding:4px 6px;border:1px solid var(--border);border-radius:4px;background:var(--input-bg);color:var(--text)">'
: '<input type="color" data-' + dataAttr + '="' + key + '" value="' + val + '">' +
? '<input type="text" id="cust-' + dataAttr + '-' + key + '" data-' + dataAttr + '="' + key + '" value="' + escAttr(val) + '" style="width:160px;font-size:11px;font-family:var(--mono);padding:4px 6px;border:1px solid var(--border);border-radius:4px;background:var(--input-bg);color:var(--text)">'
: '<input type="color" id="cust-' + dataAttr + '-' + key + '" data-' + dataAttr + '="' + key + '" value="' + val + '">' +
'<span class="cust-hex" data-hex="' + key + '">' + val + '</span>';
return '<div class="cust-color-row">' +
'<div><label>' + THEME_LABELS[key] + '</label>' +
'<div><label for="cust-' + dataAttr + '-' + key + '">' + THEME_LABELS[key] + '</label>' +
'<div class="cust-hint">' + (THEME_HINTS[key] || '') + '</div></div>' +
inputHtml +
(val !== def ? '<button class="cust-reset-btn" data-reset-theme="' + key + '">Reset</button>' : '') +
@@ -708,9 +708,9 @@
var val = state.nodeColors[key];
var def = DEFAULTS.nodeColors[key];
rows += '<div class="cust-color-row">' +
'<div><label>' + NODE_EMOJI[key] + ' ' + NODE_LABELS[key] + '</label>' +
'<div><label for="cust-node-' + key + '">' + NODE_EMOJI[key] + ' ' + NODE_LABELS[key] + '</label>' +
'<div class="cust-hint">' + (NODE_HINTS[key] || '') + '</div></div>' +
'<input type="color" data-node="' + key + '" value="' + val + '">' +
'<input type="color" id="cust-node-' + key + '" data-node="' + key + '" value="' + val + '">' +
'<span class="cust-node-dot" style="background:' + val + '" data-dot="' + key + '"></span>' +
'<span class="cust-hex" data-nhex="' + key + '">' + val + '</span>' +
(val !== def ? '<button class="cust-reset-btn" data-reset-node="' + key + '">Reset</button>' : '') +
@@ -721,9 +721,9 @@
var tval = state.typeColors[tkey];
var tdef = DEFAULTS.typeColors[tkey];
typeRows += '<div class="cust-color-row">' +
'<div><label>' + (TYPE_EMOJI[tkey] || '') + ' ' + TYPE_LABELS[tkey] + '</label>' +
'<div><label for="cust-type-' + tkey + '">' + (TYPE_EMOJI[tkey] || '') + ' ' + TYPE_LABELS[tkey] + '</label>' +
'<div class="cust-hint">' + (TYPE_HINTS[tkey] || '') + '</div></div>' +
'<input type="color" data-type-color="' + tkey + '" value="' + tval + '">' +
'<input type="color" id="cust-type-' + tkey + '" data-type-color="' + tkey + '" value="' + tval + '">' +
'<span class="cust-node-dot" style="background:' + tval + '" data-tdot="' + tkey + '"></span>' +
'<span class="cust-hex" data-thex="' + tkey + '">' + tval + '</span>' +
(tval !== tdef ? '<button class="cust-reset-btn" data-reset-type="' + tkey + '">Reset</button>' : '') +
@@ -742,13 +742,13 @@
'<hr style="border:none;border-top:1px solid var(--border);margin:16px 0">' +
'<p class="cust-section-title">Heatmap Opacity</p>' +
'<div class="cust-color-row">' +
'<div><label>🗺️ Nodes Map</label>' +
'<div><label for="custHeatOpacity">🗺️ Nodes Map</label>' +
'<div class="cust-hint">Heatmap overlay on the Nodes → Map page (0100%)</div></div>' +
'<input type="range" id="custHeatOpacity" min="0" max="100" value="' + heatPct + '" style="width:120px;cursor:pointer">' +
'<span id="custHeatOpacityVal" style="font-family:var(--mono);font-size:12px;color:var(--text-muted);min-width:36px">' + heatPct + '%</span>' +
'</div>' +
'<div class="cust-color-row">' +
'<div><label>📡 Live Map</label>' +
'<div><label for="custLiveHeatOpacity">📡 Live Map</label>' +
'<div class="cust-hint">Heatmap overlay on the Live page (0100%)</div></div>' +
'<input type="range" id="custLiveHeatOpacity" min="0" max="100" value="' + liveHeatPct + '" style="width:120px;cursor:pointer">' +
'<span id="custLiveHeatOpacityVal" style="font-family:var(--mono);font-size:12px;color:var(--text-muted);min-width:36px">' + liveHeatPct + '%</span>' +
@@ -761,13 +761,13 @@
var stepsHtml = h.steps.map(function (s, i) {
return '<div class="cust-list-item" data-step="' + i + '">' +
'<div class="cust-list-row">' +
'<input class="cust-emoji-input" data-step-field="emoji" data-idx="' + i + '" value="' + escAttr(s.emoji) + '" placeholder="📡">' +
'<input data-step-field="title" data-idx="' + i + '" value="' + escAttr(s.title) + '" placeholder="Title">' +
'<input class="cust-emoji-input" data-step-field="emoji" data-idx="' + i + '" value="' + escAttr(s.emoji) + '" placeholder="📡" aria-label="Step ' + (i + 1) + ' emoji">' +
'<input data-step-field="title" data-idx="' + i + '" value="' + escAttr(s.title) + '" placeholder="Title" aria-label="Step ' + (i + 1) + ' title">' +
'<button class="cust-list-btn" data-move-step="' + i + '" data-dir="up" title="Move up">↑</button>' +
'<button class="cust-list-btn" data-move-step="' + i + '" data-dir="down" title="Move down">↓</button>' +
'<button class="cust-list-btn danger" data-rm-step="' + i + '" title="Remove">✕</button>' +
'</div>' +
'<textarea data-step-field="description" data-idx="' + i + '" placeholder="Description" rows="2">' + esc(s.description) + '</textarea>' +
'<textarea data-step-field="description" data-idx="' + i + '" placeholder="Description" rows="2" aria-label="Step ' + (i + 1) + ' description">' + esc(s.description) + '</textarea>' +
'<div class="cust-md-hint">Markdown: <code>**bold**</code> <code>*italic*</code> <code>`code`</code> <code>[text](url)</code> <code>- list</code></div>' +
'</div>';
}).join('');
@@ -775,10 +775,10 @@
var checkHtml = h.checklist.map(function (c, i) {
return '<div class="cust-list-item" data-check="' + i + '">' +
'<div class="cust-list-row">' +
'<input data-check-field="question" data-idx="' + i + '" value="' + escAttr(c.question) + '" placeholder="Question">' +
'<input data-check-field="question" data-idx="' + i + '" value="' + escAttr(c.question) + '" placeholder="Question" aria-label="Checklist item ' + (i + 1) + ' question">' +
'<button class="cust-list-btn danger" data-rm-check="' + i + '" title="Remove">✕</button>' +
'</div>' +
'<textarea data-check-field="answer" data-idx="' + i + '" placeholder="Answer" rows="2">' + esc(c.answer) + '</textarea>' +
'<textarea data-check-field="answer" data-idx="' + i + '" placeholder="Answer" rows="2" aria-label="Checklist item ' + (i + 1) + ' answer">' + esc(c.answer) + '</textarea>' +
'<div class="cust-md-hint">Markdown: <code>**bold**</code> <code>*italic*</code> <code>`code`</code> <code>[text](url)</code> <code>- list</code></div>' +
'</div>';
}).join('');
@@ -786,16 +786,16 @@
var linksHtml = h.footerLinks.map(function (l, i) {
return '<div class="cust-list-item" data-link="' + i + '">' +
'<div class="cust-list-row">' +
'<input data-link-field="label" data-idx="' + i + '" value="' + escAttr(l.label) + '" placeholder="Label">' +
'<input data-link-field="label" data-idx="' + i + '" value="' + escAttr(l.label) + '" placeholder="Label" aria-label="Footer link ' + (i + 1) + ' label">' +
'<button class="cust-list-btn danger" data-rm-link="' + i + '" title="Remove">✕</button>' +
'</div>' +
'<input data-link-field="url" data-idx="' + i + '" value="' + escAttr(l.url) + '" placeholder="URL">' +
'<input data-link-field="url" data-idx="' + i + '" value="' + escAttr(l.url) + '" placeholder="URL" aria-label="Footer link ' + (i + 1) + ' URL">' +
'</div>';
}).join('');
return '<div class="cust-panel' + (activeTab === 'home' ? ' active' : '') + '" data-panel="home">' +
'<div class="cust-field"><label>Hero Title</label><input type="text" data-key="home.heroTitle" value="' + escAttr(h.heroTitle) + '"></div>' +
'<div class="cust-field"><label>Hero Subtitle</label><input type="text" data-key="home.heroSubtitle" value="' + escAttr(h.heroSubtitle) + '"></div>' +
'<div class="cust-field"><label for="cust-heroTitle">Hero Title</label><input type="text" id="cust-heroTitle" data-key="home.heroTitle" value="' + escAttr(h.heroTitle) + '"></div>' +
'<div class="cust-field"><label for="cust-heroSubtitle">Hero Subtitle</label><input type="text" id="cust-heroSubtitle" data-key="home.heroSubtitle" value="' + escAttr(h.heroSubtitle) + '"></div>' +
'<p class="cust-section-title" style="margin-top:20px">Steps</p>' + stepsHtml +
'<button class="cust-add-btn" id="addStep">+ Add Step</button>' +
'<p class="cust-section-title" style="margin-top:24px">FAQ / Checklist</p>' + checkHtml +
@@ -870,11 +870,11 @@
'<div class="cust-export-btns" style="margin-bottom:12px">' +
'<button class="cust-dl-btn" id="custDownload">💾 Download theme.json</button>' +
'<button class="cust-dl-btn" id="custImportFile">📂 Import File</button>' +
'<input type="file" id="custImportInput" accept=".json,application/json" style="display:none">' +
'<input type="file" id="custImportInput" accept=".json,application/json" style="display:none" aria-label="Import theme file">' +
'<button class="cust-copy-btn" id="custCopy">📋 Copy</button>' +
'</div>' +
'<details style="margin-top:8px"><summary style="font-size:12px;font-weight:600;cursor:pointer;color:var(--text-muted)">Raw JSON</summary>' +
'<textarea class="cust-export-area" id="custExportJson" style="margin-top:8px">' + esc(json) + '</textarea>' +
'<textarea class="cust-export-area" id="custExportJson" style="margin-top:8px" aria-label="Theme JSON data">' + esc(json) + '</textarea>' +
'</details>' +
'</div>';
}

View File

@@ -22,9 +22,9 @@
<meta name="twitter:title" content="MeshCore Analyzer">
<meta name="twitter:description" content="Real-time MeshCore LoRa mesh network analyzer — live packet visualization, node tracking, channel decryption, and route analysis.">
<meta name="twitter:image" content="https://raw.githubusercontent.com/Kpa-clawbot/meshcore-analyzer/master/public/og-image.png">
<link rel="stylesheet" href="style.css?v=1774685398">
<link rel="stylesheet" href="home.css?v=1774685398">
<link rel="stylesheet" href="live.css?v=1774685398">
<link rel="stylesheet" href="style.css?v=1774690966">
<link rel="stylesheet" href="home.css?v=1774690966">
<link rel="stylesheet" href="live.css?v=1774690966">
<link rel="stylesheet" href="https://unpkg.com/leaflet@1.9.4/dist/leaflet.css"
integrity="sha256-p4NxAoJBhIIN+hmNHrzRCf9tD/miZyoHS5obTRR9BMY="
crossorigin="anonymous">
@@ -74,36 +74,36 @@
<div id="searchOverlay" class="search-overlay hidden" aria-label="Search packets, nodes, channels">
<div class="search-box">
<input type="text" id="searchInput" placeholder="Search packets, nodes, channels…" autofocus>
<div id="searchResults" class="search-results"></div>
<div id="searchResults" class="search-results" role="listbox"></div>
</div>
</div>
<main id="app" role="main"></main>
<script src="vendor/qrcode.js"></script>
<script src="roles.js?v=1774685398"></script>
<script src="customize.js?v=1774685398" onerror="console.error('Failed to load:', this.src)"></script>
<script src="region-filter.js?v=1774685398"></script>
<script src="hop-resolver.js?v=1774685398"></script>
<script src="hop-display.js?v=1774685398"></script>
<script src="app.js?v=1774685398"></script>
<script src="home.js?v=1774685398"></script>
<script src="packet-filter.js?v=1774685398"></script>
<script src="packets.js?v=1774685398"></script>
<script src="map.js?v=1774685398" onerror="console.error('Failed to load:', this.src)"></script>
<script src="channels.js?v=1774685398" onerror="console.error('Failed to load:', this.src)"></script>
<script src="nodes.js?v=1774685398" onerror="console.error('Failed to load:', this.src)"></script>
<script src="traces.js?v=1774685398" onerror="console.error('Failed to load:', this.src)"></script>
<script src="analytics.js?v=1774685398" onerror="console.error('Failed to load:', this.src)"></script>
<script src="audio.js?v=1774685398" onerror="console.error('Failed to load:', this.src)"></script>
<script src="audio-v1-constellation.js?v=1774685398" onerror="console.error('Failed to load:', this.src)"></script>
<script src="audio-v2-constellation.js?v=1774685398" onerror="console.error('Failed to load:', this.src)"></script>
<script src="audio-lab.js?v=1774685398" onerror="console.error('Failed to load:', this.src)"></script>
<script src="live.js?v=1774685398" onerror="console.error('Failed to load:', this.src)"></script>
<script src="observers.js?v=1774685398" onerror="console.error('Failed to load:', this.src)"></script>
<script src="observer-detail.js?v=1774685398" onerror="console.error('Failed to load:', this.src)"></script>
<script src="compare.js?v=1774685398" onerror="console.error('Failed to load:', this.src)"></script>
<script src="node-analytics.js?v=1774685398" onerror="console.error('Failed to load:', this.src)"></script>
<script src="perf.js?v=1774685398" onerror="console.error('Failed to load:', this.src)"></script>
<script src="roles.js?v=1774690966"></script>
<script src="customize.js?v=1774690966" onerror="console.error('Failed to load:', this.src)"></script>
<script src="region-filter.js?v=1774690966"></script>
<script src="hop-resolver.js?v=1774690966"></script>
<script src="hop-display.js?v=1774690966"></script>
<script src="app.js?v=1774690966"></script>
<script src="home.js?v=1774690966"></script>
<script src="packet-filter.js?v=1774690966"></script>
<script src="packets.js?v=1774690966"></script>
<script src="map.js?v=1774690966" onerror="console.error('Failed to load:', this.src)"></script>
<script src="channels.js?v=1774690966" onerror="console.error('Failed to load:', this.src)"></script>
<script src="nodes.js?v=1774690966" onerror="console.error('Failed to load:', this.src)"></script>
<script src="traces.js?v=1774690966" onerror="console.error('Failed to load:', this.src)"></script>
<script src="analytics.js?v=1774690966" onerror="console.error('Failed to load:', this.src)"></script>
<script src="audio.js?v=1774690966" onerror="console.error('Failed to load:', this.src)"></script>
<script src="audio-v1-constellation.js?v=1774690966" onerror="console.error('Failed to load:', this.src)"></script>
<script src="audio-v2-constellation.js?v=1774690966" onerror="console.error('Failed to load:', this.src)"></script>
<script src="audio-lab.js?v=1774690966" onerror="console.error('Failed to load:', this.src)"></script>
<script src="live.js?v=1774690966" onerror="console.error('Failed to load:', this.src)"></script>
<script src="observers.js?v=1774690966" onerror="console.error('Failed to load:', this.src)"></script>
<script src="observer-detail.js?v=1774690966" onerror="console.error('Failed to load:', this.src)"></script>
<script src="compare.js?v=1774690966" onerror="console.error('Failed to load:', this.src)"></script>
<script src="node-analytics.js?v=1774690966" onerror="console.error('Failed to load:', this.src)"></script>
<script src="perf.js?v=1774690966" onerror="console.error('Failed to load:', this.src)"></script>
</body>
</html>

View File

@@ -286,6 +286,9 @@
.live-stat-pill { font-size: 11px; padding: 2px 7px; }
.live-toggles { font-size: 10px; gap: 6px; margin-left: 0; }
.live-title { font-size: 12px; letter-spacing: 1px; }
/* #203 — bottom-sheet node detail on mobile */
.live-node-detail { width: 100%; right: 0; left: 0; top: auto; bottom: 0; max-height: 60vh; border-radius: 16px 16px 0 0; overflow-y: auto; }
.live-node-detail.hidden { transform: translateY(100%); }
.feed-detail-card {
position: fixed !important;
right: 0 !important;
@@ -663,9 +666,9 @@
.vcr-mode { display: none; }
/* Row 2: timeline takes full width */
.vcr-timeline-container { order: 4; width: 100%; flex: none; height: 20px; }
/* Smaller buttons */
.vcr-btn { padding: 4px 8px; font-size: 0.75rem; min-height: 32px; min-width: 32px; }
.vcr-scope-btn { font-size: 0.6rem; padding: 2px 6px; min-height: 28px; }
/* #207 — 44px touch targets for VCR buttons */
.vcr-btn { padding: 4px 8px; font-size: 0.75rem; min-height: 44px; min-width: 44px; }
.vcr-scope-btn { font-size: 0.6rem; padding: 2px 6px; min-height: 44px; min-width: 44px; }
.vcr-prompt { order: 5; width: 100%; font-size: 0.7rem; }
}

View File

@@ -102,27 +102,27 @@
<div class="analytics-chart-card full">
<h4>Activity Timeline</h4>
<div class="analytics-chart-desc">Packet count per time bucket — shows when this node is most active</div>
<canvas id="activityChart"></canvas>
<canvas id="activityChart" role="img" aria-label="Activity timeline chart"></canvas>
</div>
<div class="analytics-chart-card">
<h4>SNR Trend</h4>
<div class="analytics-chart-desc">Signal-to-noise ratio over time — higher is better reception</div>
<canvas id="snrChart"></canvas>
<canvas id="snrChart" role="img" aria-label="SNR trend chart"></canvas>
</div>
<div class="analytics-chart-card">
<h4>Packet Types</h4>
<div class="analytics-chart-desc">Breakdown of advert, position, text, and other packet types</div>
<canvas id="packetTypeChart"></canvas>
<canvas id="packetTypeChart" role="img" aria-label="Packet types chart"></canvas>
</div>
<div class="analytics-chart-card">
<h4>Observer Coverage</h4>
<div class="analytics-chart-desc">Which stations hear this node and how often</div>
<canvas id="observerChart"></canvas>
<canvas id="observerChart" role="img" aria-label="Observer coverage chart"></canvas>
</div>
<div class="analytics-chart-card">
<h4>Hop Distribution</h4>
<div class="analytics-chart-desc">How many repeater hops packets take — 0 means direct</div>
<canvas id="hopChart"></canvas>
<canvas id="hopChart" role="img" aria-label="Hop distribution chart"></canvas>
</div>
<div class="analytics-chart-card full">
<h4>Uptime Heatmap</h4>
@@ -132,14 +132,14 @@
${data.peerInteractions.length ? `<div class="analytics-chart-card full">
<h4>Peer Interactions</h4>
<div class="analytics-chart-desc">Nodes this device has exchanged messages with</div>
<table class="analytics-peer-table">
<thead><tr><th>Peer</th><th>Messages</th><th>Last Contact</th></tr></thead>
<div class="analytics-table-scroll"><table class="analytics-peer-table">
<thead><tr><th scope="col">Peer</th><th scope="col">Messages</th><th scope="col">Last Contact</th></tr></thead>
<tbody>${data.peerInteractions.map(p => `<tr>
<td><a href="#/nodes/${encodeURIComponent(p.peer_key)}" style="color:var(--accent)">${escapeHtml(p.peer_name)}</a></td>
<td>${p.messageCount}</td>
<td>${timeAgo(p.lastContact)}</td>
</tr>`).join('')}</tbody>
</table>
</table></div>
</div>` : ''}
</div>
</div>`;

View File

@@ -287,7 +287,7 @@
${(() => { const regions = [...new Set(observers.map(o => o.iata).filter(Boolean))]; return regions.length ? `<div style="margin-bottom:8px"><strong>Regions:</strong> ${regions.map(r => '<span class="badge" style="margin:0 2px">' + escapeHtml(r) + '</span>').join(' ')}</div>` : ''; })()}
<h4>Heard By (${observers.length} observer${observers.length > 1 ? 's' : ''})</h4>
<table class="data-table" style="font-size:12px">
<thead><tr><th>Observer</th><th>Region</th><th>Packets</th><th>Avg SNR</th><th>Avg RSSI</th></tr></thead>
<thead><tr><th scope="col">Observer</th><th scope="col">Region</th><th scope="col">Packets</th><th scope="col">Avg SNR</th><th scope="col">Avg RSSI</th></tr></thead>
<tbody>
${observers.map(o => `<tr>
<td style="font-weight:600">${escapeHtml(o.observer_name || o.observer_id)}</td>
@@ -577,11 +577,11 @@
</div>
<table class="data-table" id="nodesTable">
<thead><tr>
<th class="sortable${sortState.column==='name'?' sort-active':''}" data-sort="name">Name${sortArrow('name')}</th>
<th class="sortable${sortState.column==='public_key'?' sort-active':''}" data-sort="public_key">Public Key${sortArrow('public_key')}</th>
<th class="sortable${sortState.column==='role'?' sort-active':''}" data-sort="role">Role${sortArrow('role')}</th>
<th class="sortable${sortState.column==='last_seen'?' sort-active':''}" data-sort="last_seen">Last Seen${sortArrow('last_seen')}</th>
<th class="sortable${sortState.column==='advert_count'?' sort-active':''}" data-sort="advert_count">Adverts${sortArrow('advert_count')}</th>
<th scope="col" class="sortable${sortState.column==='name'?' sort-active':''}" data-sort="name">Name${sortArrow('name')}</th>
<th scope="col" class="col-pubkey sortable${sortState.column==='public_key'?' sort-active':''}" data-sort="public_key">Public Key${sortArrow('public_key')}</th>
<th scope="col" class="sortable${sortState.column==='role'?' sort-active':''}" data-sort="role">Role${sortArrow('role')}</th>
<th scope="col" class="sortable${sortState.column==='last_seen'?' sort-active':''}" data-sort="last_seen">Last Seen${sortArrow('last_seen')}</th>
<th scope="col" class="sortable${sortState.column==='advert_count'?' sort-active':''}" data-sort="advert_count">Adverts${sortArrow('advert_count')}</th>
</tr></thead>
<tbody id="nodesBody"></tbody>
</table>`;
@@ -674,7 +674,7 @@
const lastSeenClass = status === 'active' ? 'last-seen-active' : 'last-seen-stale';
return `<tr data-key="${n.public_key}" data-action="select" data-value="${n.public_key}" tabindex="0" role="row" class="${selectedKey === n.public_key ? 'selected' : ''}${isClaimed ? ' claimed-row' : ''}">
<td>${favStar(n.public_key, 'node-fav')}${isClaimed ? '<span class="claimed-badge" title="My Mesh">★</span> ' : ''}<strong>${n.name || '(unnamed)'}</strong>${dupNameBadge(n.name, n.public_key, dupMap)}</td>
<td class="mono">${truncate(n.public_key, 16)}</td>
<td class="mono col-pubkey">${truncate(n.public_key, 16)}</td>
<td><span class="badge" style="background:${roleColor}20;color:${roleColor}">${n.role}</span></td>
<td class="${lastSeenClass}">${timeAgo(n.last_heard || n.last_seen)}</td>
<td>${n.advert_count || 0}</td>

View File

@@ -157,19 +157,19 @@
<div class="obs-charts" style="display:grid;grid-template-columns:repeat(auto-fit,minmax(400px,1fr));gap:16px">
<div class="chart-card" style="padding:12px">
<h3 style="margin:0 0 8px;font-size:0.95em">Packets Over Time</h3>
<canvas id="obsTimeChart"></canvas>
<canvas id="obsTimeChart" role="img" aria-label="Packets over time chart"></canvas>
</div>
<div class="chart-card" style="padding:12px">
<h3 style="margin:0 0 8px;font-size:0.95em">Packet Types</h3>
<div style="max-width:280px;margin:0 auto"><canvas id="obsTypeChart"></canvas></div>
<div style="max-width:280px;margin:0 auto"><canvas id="obsTypeChart" role="img" aria-label="Packet types chart"></canvas></div>
</div>
<div class="chart-card" style="padding:12px">
<h3 style="margin:0 0 8px;font-size:0.95em">Unique Nodes Heard</h3>
<canvas id="obsNodesChart"></canvas>
<canvas id="obsNodesChart" role="img" aria-label="Unique nodes heard chart"></canvas>
</div>
<div class="chart-card" style="padding:12px">
<h3 style="margin:0 0 8px;font-size:0.95em">SNR Distribution</h3>
<canvas id="obsSnrChart"></canvas>
<canvas id="obsSnrChart" role="img" aria-label="SNR distribution chart"></canvas>
</div>
</div>
<div style="margin-top:20px">
@@ -299,12 +299,12 @@
const el = document.getElementById('obsRecentPackets');
if (!el || !packets.length) { if (el) el.innerHTML = '<div class="text-muted">No recent packets.</div>'; return; }
el.innerHTML = `<table class="data-table" style="font-size:0.85em">
<thead><tr><th>Time</th><th>Type</th><th>Hash</th><th>SNR</th><th>RSSI</th><th>Hops</th></tr></thead>
<thead><tr><th scope="col">Time</th><th scope="col">Type</th><th scope="col">Hash</th><th scope="col">SNR</th><th scope="col">RSSI</th><th scope="col">Hops</th></tr></thead>
<tbody>${packets.map(p => {
const decoded = typeof p.decoded_json === 'string' ? JSON.parse(p.decoded_json) : (p.decoded_json || {});
const hops = typeof p.path_json === 'string' ? JSON.parse(p.path_json) : (p.path_json || []);
const typeName = PAYLOAD_LABELS[p.payload_type] || 'Type ' + p.payload_type;
return `<tr style="cursor:pointer" onclick="location.hash='#/packets/${p.hash || p.id}'">
return `<tr style="cursor:pointer" tabindex="0" role="row" data-action="navigate" data-value="#/packets/${p.hash || p.id}" onclick="location.hash='#/packets/${p.hash || p.id}'">
<td>${timeAgo(p.timestamp)}</td>
<td>${typeName}</td>
<td class="mono" style="font-size:0.85em">${(p.hash || '').substring(0, 10)}</td>
@@ -314,6 +314,15 @@
</tr>`;
}).join('')}</tbody>
</table>`;
// #209 — Keyboard accessibility for recent packet rows
el.addEventListener('keydown', function (e) {
var row = e.target.closest('tr[data-action="navigate"]');
if (!row) return;
if (e.key !== 'Enter' && e.key !== ' ') return;
e.preventDefault();
location.hash = row.dataset.value;
});
}
registerPage('observer-detail', { init, destroy });

View File

@@ -25,6 +25,16 @@
app.addEventListener('click', function (e) {
var btn = e.target.closest('[data-action]');
if (btn && btn.dataset.action === 'obs-refresh') loadObservers();
var row = e.target.closest('tr[data-action="navigate"]');
if (row) location.hash = row.dataset.value;
});
// #209 — Keyboard accessibility for observer rows
app.addEventListener('keydown', function (e) {
var row = e.target.closest('tr[data-action="navigate"]');
if (!row) return;
if (e.key !== 'Enter' && e.key !== ' ') return;
e.preventDefault();
location.hash = row.dataset.value;
});
// Auto-refresh every 30s
refreshTimer = setInterval(loadObservers, 30000);
@@ -113,13 +123,13 @@
<div class="obs-table-scroll"><table class="data-table obs-table" id="obsTable">
<caption class="sr-only">Observer status and statistics</caption>
<thead><tr>
<th>Status</th><th>Name</th><th>Region</th><th>Last Seen</th>
<th>Packets</th><th>Packets/Hour</th><th>Uptime</th>
<th scope="col">Status</th><th scope="col">Name</th><th scope="col">Region</th><th scope="col">Last Seen</th>
<th scope="col">Packets</th><th scope="col">Packets/Hour</th><th scope="col">Uptime</th>
</tr></thead>
<tbody>${filtered.map(o => {
const h = healthStatus(o.last_seen);
const shape = h.cls === 'health-green' ? '●' : h.cls === 'health-yellow' ? '▲' : '✕';
return `<tr style="cursor:pointer" onclick="location.hash='#/observers/${encodeURIComponent(o.id)}'">
return `<tr style="cursor:pointer" tabindex="0" role="row" data-action="navigate" data-value="#/observers/${encodeURIComponent(o.id)}" onclick="location.hash='#/observers/${encodeURIComponent(o.id)}'">
<td><span class="health-dot ${h.cls}" title="${h.label}">${shape}</span> ${h.label}</td>
<td class="mono">${o.name || o.id}</td>
<td>${o.iata ? `<span class="badge-region">${o.iata}</span>` : '—'}</td>

View File

@@ -501,6 +501,7 @@
<div class="filter-group" style="flex:1;margin-bottom:8px">
<input type="text" id="packetFilterInput" class="packet-filter-input"
placeholder='Filter: type == Advert && snr > 5 · payload.name contains "Gilroy"'
aria-label="Packet filter expression"
style="width:100%;padding:6px 10px;border:1px solid var(--border);border-radius:6px;font-family:var(--mono);font-size:13px;background:var(--input-bg);color:var(--text)">
<div id="packetFilterError" style="color:var(--status-red);font-size:11px;margin-top:2px;display:none"></div>
<div id="packetFilterCount" style="color:var(--text-muted);font-size:11px;margin-top:2px;display:none"></div>
@@ -528,7 +529,7 @@
<button class="btn" id="fMyNodes" title="Show only packets from your favorited/claimed nodes">★ My Nodes</button>
</div>
<div class="filter-group">
<select id="fTimeWindow" class="filter-select">
<select id="fTimeWindow" class="filter-select" aria-label="Time window filter">
<option value="15">Last 15 min</option>
<option value="30">Last 30 min</option>
<option value="60">Last 1 hour</option>
@@ -559,8 +560,8 @@
</div>
<table class="data-table" id="pktTable">
<thead><tr>
<th></th><th class="col-region">Region</th><th class="col-time">Time</th><th class="col-hash">Hash</th><th class="col-size">Size</th>
<th class="col-type">Type</th><th class="col-observer">Observer</th><th class="col-path">Path</th><th class="col-rpt">Rpt</th><th class="col-details">Details</th>
<th scope="col"></th><th scope="col" class="col-region">Region</th><th scope="col" class="col-time">Time</th><th scope="col" class="col-hash">Hash</th><th scope="col" class="col-size">Size</th>
<th scope="col" class="col-type">Type</th><th scope="col" class="col-observer">Observer</th><th scope="col" class="col-path">Path</th><th scope="col" class="col-rpt">Rpt</th><th scope="col" class="col-details">Details</th>
</tr></thead>
<tbody id="pktBody"></tbody>
</table>
@@ -1524,7 +1525,7 @@
}
return `<table class="field-table">
<thead><tr><th>Offset</th><th>Field</th><th>Value</th><th>Description</th></tr></thead>
<thead><tr><th scope="col">Offset</th><th scope="col">Field</th><th scope="col">Value</th><th scope="col">Description</th></tr></thead>
<tbody>${rows}</tbody>
</table>`;
}

View File

@@ -5,7 +5,7 @@
let interval = null;
async function render(app) {
app.innerHTML = '<div style="height:100%;overflow-y:auto;padding:16px 24px;"><h2>⚡ Performance Dashboard</h2><div id="perfContent">Loading...</div></div>';
app.innerHTML = '<div id="perfWrapper" style="height:100%;overflow-y:auto;padding:16px 24px;"><h2>⚡ Performance Dashboard</h2><div id="perfContent">Loading...</div></div>';
await refresh();
}
@@ -125,7 +125,7 @@
const eps = Object.entries(server.endpoints);
if (eps.length) {
html += '<h3>Server Endpoints (sorted by total time)</h3>';
html += '<div style="overflow-x:auto"><table class="perf-table"><thead><tr><th>Endpoint</th><th>Count</th><th>Avg</th><th>P50</th><th>P95</th><th>Max</th><th>Total</th></tr></thead><tbody>';
html += '<div style="overflow-x:auto"><table class="perf-table"><thead><tr><th scope="col">Endpoint</th><th scope="col">Count</th><th scope="col">Avg</th><th scope="col">P50</th><th scope="col">P95</th><th scope="col">Max</th><th scope="col">Total</th></tr></thead><tbody>';
for (const [path, s] of eps) {
const total = Math.round(s.count * s.avgMs);
const cls = s.p95Ms > 200 ? ' class="perf-slow"' : s.p95Ms > 50 ? ' class="perf-warn"' : '';
@@ -137,7 +137,7 @@
// Client API calls
if (client && client.endpoints.length) {
html += '<h3>Client API Calls (this session)</h3>';
html += '<div style="overflow-x:auto"><table class="perf-table"><thead><tr><th>Endpoint</th><th>Count</th><th>Avg</th><th>Max</th><th>Total</th></tr></thead><tbody>';
html += '<div style="overflow-x:auto"><table class="perf-table"><thead><tr><th scope="col">Endpoint</th><th scope="col">Count</th><th scope="col">Avg</th><th scope="col">Max</th><th scope="col">Total</th></tr></thead><tbody>';
for (const s of client.endpoints) {
const cls = s.maxMs > 500 ? ' class="perf-slow"' : s.avgMs > 200 ? ' class="perf-warn"' : '';
html += `<tr${cls}><td><code>${s.path}</code></td><td>${s.count}</td><td>${s.avgMs}ms</td><td>${s.maxMs}ms</td><td>${s.totalMs}ms</td></tr>`;
@@ -148,7 +148,7 @@
// Slow queries
if (server.slowQueries.length) {
html += '<h3>Recent Slow Queries (&gt;100ms)</h3>';
html += '<div style="overflow-x:auto"><table class="perf-table"><thead><tr><th>Time</th><th>Path</th><th>Duration</th><th>Status</th></tr></thead><tbody>';
html += '<div style="overflow-x:auto"><table class="perf-table"><thead><tr><th scope="col">Time</th><th scope="col">Path</th><th scope="col">Duration</th><th scope="col">Status</th></tr></thead><tbody>';
for (const q of server.slowQueries.slice().reverse()) {
html += `<tr class="perf-slow"><td>${new Date(q.time).toLocaleTimeString()}</td><td><code>${q.path}</code></td><td>${q.ms}ms</td><td>${q.status}</td></tr>`;
}

View File

@@ -1205,7 +1205,7 @@ button.ch-item.ch-item-encrypted .ch-badge { filter: grayscale(0.6); }
/* Hide low-value columns on mobile */
@media (max-width: 640px) {
.col-region, .col-rpt, .col-size { display: none; }
.col-region, .col-rpt, .col-size, .col-pubkey { display: none; }
}
/* Clickable hop links */
@@ -1389,6 +1389,12 @@ tr[data-hops]:hover { background: rgba(59,130,246,0.1); }
/* #20 — Observers table horizontal scroll on mobile */
.obs-table-scroll { overflow-x: auto; -webkit-overflow-scrolling: touch; }
.obs-table-scroll .obs-table { min-width: 640px; }
/* #206 — Analytics/Compare tables scroll wrappers on mobile */
.analytics-table-scroll { overflow-x: auto; -webkit-overflow-scrolling: touch; }
.analytics-table-scroll .analytics-table,
.analytics-table-scroll .analytics-peer-table,
.analytics-table-scroll .compare-table { min-width: 480px; }
@media (max-width: 640px) {
.spark-bar { min-width: 60px; width: auto; }
}
@@ -1532,6 +1538,14 @@ tr[data-hops]:hover { background: rgba(59,130,246,0.1); }
.perf-table .perf-warn { background: rgba(251, 191, 36, 0.06); }
.perf-table .perf-warn td { color: var(--status-yellow); }
/* #204 — Perf page responsive */
@media (max-width: 640px) {
#perfWrapper { padding: 12px !important; }
.perf-card { min-width: 0; flex: 1 1 calc(50% - 8px); }
.perf-table { font-size: 11px; }
.perf-table th, .perf-table td { padding: 4px 6px; }
}
/* ─── Region filter bar ─── */
.region-filter-bar { display: flex; flex-wrap: wrap; gap: 6px; padding: 8px 0; }
.region-filter-container { margin: 0; padding: 0; display: inline-flex; align-items: center; }