Compare commits

..

12 Commits

Author SHA1 Message Date
you
1a87e7410e v2.3.0: Packet Deduplication — normalized storage with transmissions + observations
CI: add setup-node to deploy workflow
2026-03-20 23:02:58 +00:00
you
91a6a2c525 fix: migration handles concurrent dual-write with INSERT OR IGNORE 2026-03-20 22:52:12 +00:00
you
e5a609bbfc Merge dedup-normalize: packet deduplication (M1-M5)
Normalized packet storage: transmissions + observations tables.
- M1: Schema migration script
- M2: Dual-write ingest
- M3: In-memory store restructured around transmissions
- M4: API responses include observation_count, totalTransmissions
- M5: Frontend badges, deeplinks, dedup-aware UI
2026-03-20 22:49:13 +00:00
you
95b59d1792 fix: recent packets deeplink uses route path not query param 2026-03-20 22:34:00 +00:00
you
e7aa4246ac fix: live node panel deeplinks — full detail, observer names, recent packets
- Full Detail links to #/nodes/<pubkey> (was #/nodes?selected= which just showed list)
- Heard By observer names link to #/observers/<id>
- Recent Packets link to #/packets?hash=<hash>
2026-03-20 22:17:13 +00:00
you
f1aa6caf93 fix: packet expand shows observations, heard-by uses correct field name
- pktToggleGroup fetches ?expand=observations and maps them as children
- Live page heard-by uses o.packetCount (was o.count → undefined)
2026-03-20 22:02:54 +00:00
you
a882aae681 M5: Frontend updates for dedup — observation_count badges, totalTransmissions
- packets.js: Show observation_count badge (👁 N) on grouped rows
- nodes.js: Use totalTransmissions (fallback totalPackets), show observation badges on recent packets
- home.js: Use totalTransmissions for network stats
- node-analytics.js: Use totalTransmissions for throughput display
- analytics.js: Use totalTransmissions for overview stats and node rankings
- live.js: Use totalTransmissions in node detail, show observation badges in feed and recent packets
- style.css: Add .badge-obs style for observation count badges
- index.html: Bump cache busters on all changed JS/CSS files

All changes have backward compat fallbacks to totalPackets.
2026-03-20 21:31:10 +00:00
you
aa35164252 M4: API response changes for dedup-normalize
- GET /api/packets: returns transmissions with observation_count, strip
  observations[] by default (use ?expand=observations to include)
- GET /api/packets/🆔 includes observation_count and observations[]
- GET /api/nodes/:pubkey/health: stats.totalTransmissions + totalObservations
  (totalPackets kept for backward compat)
- GET /api/nodes/bulk-health: same transmission/observation split
- WebSocket broadcast: includes observation_count
- db.js getStats(): adds totalTransmissions count
- All backward-compatible: old field names preserved alongside new ones
2026-03-20 20:49:34 +00:00
you
84f33aef7b M3: Restructure in-memory store around transmissions
- load() reads from transmissions JOIN observations (with legacy fallback)
- byHash now maps hash → single transmission object (1:1)
- byNode maps pubkey → [transmissions] (deduped, no inflated observations)
- byTransmission is the primary data structure
- byId maps observation IDs for backward-compat packet detail links
- byObserver still maps observer_id → [observations]
- getSiblings() returns observations from transmission
- findPacketsForNode() returns unique transmissions
- query()/queryGrouped() work with transmission-centric model
- All returned objects maintain backward-compatible fields
- SQLite-only fallback path (NO_MEMORY_STORE=1) unchanged
- Tested: 11.6K transmissions from 37.5K observations (3.2x dedup)
2026-03-20 20:44:32 +00:00
you
baa60cac0f M2: Dual-write ingest to transmissions/observations tables
- Add transmissions and observations schema to db.js init
- Add insertTransmission() function: upsert transmission by hash,
  always insert observation row
- All 6 pktStore.insert() call sites in server.js now also call
  db.insertTransmission() with try/catch (non-fatal on error)
- packet-store.js: add byTransmission Map index (hash → transmission
  with observations array) for future M3 query migration
- Existing insertPacket() and all read paths unchanged
2026-03-20 20:29:03 +00:00
you
d7e415daa7 fix: raw_hex NOT NULL in transmissions schema — deleted 4 junk test rows 2026-03-20 20:24:13 +00:00
you
2c6148fd2d Add dedup migration script (Milestone 1)
Creates transmissions and observations tables from existing packets table.
- Groups packets by hash → 1 transmission per unique hash
- Creates 1 observation per original packet row with FK to transmission
- Idempotent: drops and recreates new tables on each run
- Does NOT modify the original packets table
- Prints stats and verifies counts match

Tested on test DB: 33813 packets → 11530 transmissions (2.93x dedup ratio)
2026-03-20 20:22:30 +00:00
14 changed files with 732 additions and 162 deletions

View File

@@ -14,6 +14,10 @@ jobs:
steps:
- uses: actions/checkout@v4
- uses: actions/setup-node@v4
with:
node-version: '22'
- name: Validate JS
run: sh scripts/validate.sh

View File

@@ -1,4 +1,32 @@
# Changelog
## [2.3.0] - 2026-03-20
### Added
- **Packet Deduplication**: Normalized storage with `transmissions` and `observations` tables — packets seen by multiple observers are stored once with linked observation records
- **Observation count badges**: Packets page shows 👁 badge indicating how many observers saw each transmission
- **`?expand=observations`**: API query param to include full observation details on packet responses
- **`totalTransmissions` / `totalObservations`**: Health and analytics APIs return both deduped and raw counts
- **Migration script**: `scripts/migrate-dedup.js` for converting existing packet data to normalized schema
- **Live map deeplinks**: Node detail panel links to full node detail, observer detail, and filtered packets
- **CI validation**: `setup-node` added to deploy workflow for JS syntax checking
### Changed
- In-memory packet store restructured around transmissions (primary) with observation indexes
- Packets API returns unique transmissions by default (was returning inflated observation rows)
- Home page shows "Transmissions" instead of "Packets" for network stats
- Analytics overview uses transmission counts for throughput metrics
- Node health stats include `totalTransmissions` alongside legacy `totalPackets`
- WebSocket broadcasts include `observation_count`
### Fixed
- Packet expand showing only the collapsed row instead of individual observations
- Live page "Heard By" showing "undefined pkts" (wrong field name)
- Recent packets deeplink using query param instead of route path
- Migration script handling concurrent dual-write during live deployment
### Performance
- **8.19× dedup ratio on production** (117K observations → 14K transmissions)
- RAM usage reduced proportionally — store loads transmissions, not inflated observations
## v2.1.1 — Multi-Broker MQTT & Observer Detail (2026-03-20)

97
db.js
View File

@@ -71,6 +71,41 @@ db.exec(`
CREATE INDEX IF NOT EXISTS idx_packets_payload_type ON packets(payload_type);
CREATE INDEX IF NOT EXISTS idx_nodes_last_seen ON nodes(last_seen);
CREATE INDEX IF NOT EXISTS idx_observers_last_seen ON observers(last_seen);
CREATE TABLE IF NOT EXISTS transmissions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
raw_hex TEXT NOT NULL,
hash TEXT NOT NULL UNIQUE,
first_seen TEXT NOT NULL,
route_type INTEGER,
payload_type INTEGER,
payload_version INTEGER,
decoded_json TEXT,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS observations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
transmission_id INTEGER NOT NULL REFERENCES transmissions(id),
hash TEXT NOT NULL,
observer_id TEXT,
observer_name TEXT,
direction TEXT,
snr REAL,
rssi REAL,
score INTEGER,
path_json TEXT,
timestamp TEXT NOT NULL,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_transmissions_hash ON transmissions(hash);
CREATE INDEX IF NOT EXISTS idx_transmissions_first_seen ON transmissions(first_seen);
CREATE INDEX IF NOT EXISTS idx_transmissions_payload_type ON transmissions(payload_type);
CREATE INDEX IF NOT EXISTS idx_observations_hash ON observations(hash);
CREATE INDEX IF NOT EXISTS idx_observations_transmission_id ON observations(transmission_id);
CREATE INDEX IF NOT EXISTS idx_observations_observer_id ON observations(observer_id);
CREATE INDEX IF NOT EXISTS idx_observations_timestamp ON observations(timestamp);
`);
// --- Migrations for existing DBs ---
@@ -144,6 +179,16 @@ const stmts = {
countNodes: db.prepare(`SELECT COUNT(*) as count FROM nodes`),
countObservers: db.prepare(`SELECT COUNT(*) as count FROM observers`),
countRecentPackets: db.prepare(`SELECT COUNT(*) as count FROM packets WHERE timestamp > ?`),
getTransmissionByHash: db.prepare(`SELECT id, first_seen FROM transmissions WHERE hash = ?`),
insertTransmission: db.prepare(`
INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, payload_version, decoded_json)
VALUES (@raw_hex, @hash, @first_seen, @route_type, @payload_type, @payload_version, @decoded_json)
`),
updateTransmissionFirstSeen: db.prepare(`UPDATE transmissions SET first_seen = @first_seen WHERE id = @id`),
insertObservation: db.prepare(`
INSERT INTO observations (transmission_id, hash, observer_id, observer_name, direction, snr, rssi, score, path_json, timestamp)
VALUES (@transmission_id, @hash, @observer_id, @observer_name, @direction, @snr, @rssi, @score, @path_json, @timestamp)
`),
};
// --- Helper functions ---
@@ -168,6 +213,49 @@ function insertPacket(data) {
return stmts.insertPacket.run(d).lastInsertRowid;
}
function insertTransmission(data) {
const hash = data.hash;
if (!hash) return null; // Can't deduplicate without a hash
const timestamp = data.timestamp || new Date().toISOString();
let transmissionId;
const existing = stmts.getTransmissionByHash.get(hash);
if (existing) {
transmissionId = existing.id;
// Update first_seen if this observation is earlier
if (timestamp < existing.first_seen) {
stmts.updateTransmissionFirstSeen.run({ id: transmissionId, first_seen: timestamp });
}
} else {
const result = stmts.insertTransmission.run({
raw_hex: data.raw_hex || '',
hash,
first_seen: timestamp,
route_type: data.route_type ?? null,
payload_type: data.payload_type ?? null,
payload_version: data.payload_version ?? null,
decoded_json: data.decoded_json || null,
});
transmissionId = result.lastInsertRowid;
}
const obsResult = stmts.insertObservation.run({
transmission_id: transmissionId,
hash,
observer_id: data.observer_id || null,
observer_name: data.observer_name || null,
direction: data.direction || null,
snr: data.snr ?? null,
rssi: data.rssi ?? null,
score: data.score ?? null,
path_json: data.path_json || null,
timestamp,
});
return { transmissionId, observationId: obsResult.lastInsertRowid };
}
function insertPath(packetId, hops) {
const tx = db.transaction((hops) => {
for (let i = 0; i < hops.length; i++) {
@@ -276,8 +364,15 @@ function getObservers() {
function getStats() {
const oneHourAgo = new Date(Date.now() - 3600000).toISOString();
// Try to get transmission count from normalized schema
let totalTransmissions = null;
try {
totalTransmissions = db.prepare('SELECT COUNT(*) as count FROM transmissions').get().count;
} catch {}
return {
totalPackets: stmts.countPackets.get().count,
totalTransmissions,
totalObservations: stmts.countPackets.get().count, // legacy packets = observations
totalNodes: stmts.countNodes.get().count,
totalObservers: stmts.countObservers.get().count,
packetsLastHour: stmts.countRecentPackets.get(oneHourAgo).count,
@@ -557,4 +652,4 @@ function getNodeAnalytics(pubkey, days) {
};
}
module.exports = { db, insertPacket, insertPath, upsertNode, upsertObserver, updateObserverStatus, getPackets, getPacket, getNodes, getNode, getObservers, getStats, seed, searchNodes, getNodeHealth, getNodeAnalytics };
module.exports = { db, insertPacket, insertTransmission, insertPath, upsertNode, upsertObserver, updateObserverStatus, getPackets, getPacket, getNodes, getNode, getObservers, getStats, seed, searchNodes, getNodeHealth, getNodeAnalytics };

View File

@@ -1,8 +1,9 @@
'use strict';
/**
* In-memory packet store — loads all packets from SQLite on startup,
* In-memory packet store — loads transmissions + observations from SQLite on startup,
* serves reads from RAM, writes to both RAM + SQLite.
* M3: Restructured around transmissions (deduped by hash) with observations.
* Caps memory at configurable limit (default 1GB).
*/
class PacketStore {
@@ -16,16 +17,22 @@ class PacketStore {
// SQLite-only mode: skip RAM loading, all reads go to DB
this.sqliteOnly = process.env.NO_MEMORY_STORE === '1';
// Core storage: array sorted by timestamp DESC (newest first)
// Primary storage: transmissions sorted by first_seen DESC (newest first)
// Each transmission looks like a packet for backward compat
this.packets = [];
// Indexes
this.byId = new Map();
this.byHash = new Map(); // hash → [packet, ...]
this.byObserver = new Map(); // observer_id → [packet, ...]
this.byNode = new Map(); // pubkey → [packet, ...]
this.byId = new Map(); // observation_id → observation object (backward compat for packet detail links)
this.byHash = new Map(); // hash → transmission object (1:1)
this.byObserver = new Map(); // observer_id → [observation objects]
this.byNode = new Map(); // pubkey → [transmission objects] (deduped)
this.byTransmission = new Map(); // hash → transmission object (same refs as byHash)
// Track which hashes are indexed per node pubkey (avoid dupes in byNode)
this._nodeHashIndex = new Map(); // pubkey → Set<hash>
this.loaded = false;
this.stats = { totalLoaded: 0, evicted: 0, inserts: 0, queries: 0 };
this.stats = { totalLoaded: 0, totalObservations: 0, evicted: 0, inserts: 0, queries: 0 };
}
/** Load all packets from SQLite into memory */
@@ -35,61 +42,310 @@ class PacketStore {
this.loaded = true;
return this;
}
const t0 = Date.now();
// Check if normalized schema exists
const hasTransmissions = this.db.prepare(
"SELECT name FROM sqlite_master WHERE type='table' AND name='transmissions'"
).get();
if (hasTransmissions) {
this._loadNormalized();
} else {
this._loadLegacy();
}
this.stats.totalLoaded = this.packets.length;
this.loaded = true;
const elapsed = Date.now() - t0;
console.log(`[PacketStore] Loaded ${this.packets.length} transmissions (${this.stats.totalObservations} observations) in ${elapsed}ms (${Math.round(this.packets.length * this.estPacketBytes / 1024 / 1024)}MB est)`);
return this;
}
/** Load from normalized transmissions + observations tables */
_loadNormalized() {
const rows = this.db.prepare(`
SELECT t.id AS transmission_id, t.raw_hex, t.hash, t.first_seen, t.route_type,
t.payload_type, t.payload_version, t.decoded_json,
o.id AS observation_id, o.observer_id, o.observer_name, o.direction,
o.snr, o.rssi, o.score, o.path_json, o.timestamp AS obs_timestamp
FROM transmissions t
LEFT JOIN observations o ON o.transmission_id = t.id
ORDER BY t.first_seen DESC, o.timestamp DESC
`).all();
for (const row of rows) {
if (this.packets.length >= this.maxPackets && !this.byTransmission.has(row.hash)) break;
let tx = this.byTransmission.get(row.hash);
if (!tx) {
tx = {
id: row.transmission_id,
raw_hex: row.raw_hex,
hash: row.hash,
first_seen: row.first_seen,
timestamp: row.first_seen,
route_type: row.route_type,
payload_type: row.payload_type,
decoded_json: row.decoded_json,
observations: [],
observation_count: 0,
// Filled from first observation for backward compat
observer_id: null,
observer_name: null,
snr: null,
rssi: null,
path_json: null,
direction: null,
};
this.byTransmission.set(row.hash, tx);
this.byHash.set(row.hash, tx);
this.packets.push(tx);
this._indexByNode(tx);
}
if (row.observation_id != null) {
const obs = {
id: row.observation_id,
observer_id: row.observer_id,
observer_name: row.observer_name,
direction: row.direction,
snr: row.snr,
rssi: row.rssi,
score: row.score,
path_json: row.path_json,
timestamp: row.obs_timestamp,
// Carry transmission fields for backward compat
hash: row.hash,
raw_hex: row.raw_hex,
payload_type: row.payload_type,
decoded_json: row.decoded_json,
route_type: row.route_type,
};
tx.observations.push(obs);
tx.observation_count++;
// Fill first observation data into transmission for backward compat
if (tx.observer_id == null && obs.observer_id) {
tx.observer_id = obs.observer_id;
tx.observer_name = obs.observer_name;
tx.snr = obs.snr;
tx.rssi = obs.rssi;
tx.path_json = obs.path_json;
tx.direction = obs.direction;
}
// byId maps observation IDs for packet detail links
this.byId.set(obs.id, obs);
// byObserver
if (obs.observer_id) {
if (!this.byObserver.has(obs.observer_id)) this.byObserver.set(obs.observer_id, []);
this.byObserver.get(obs.observer_id).push(obs);
}
this.stats.totalObservations++;
}
}
}
/** Fallback: load from legacy packets table */
_loadLegacy() {
const rows = this.db.prepare(
'SELECT * FROM packets ORDER BY timestamp DESC'
).all();
for (const row of rows) {
if (this.packets.length >= this.maxPackets) break;
this._index(row);
this.packets.push(row);
this._indexLegacy(row);
}
this.stats.totalLoaded = this.packets.length;
this.loaded = true;
const elapsed = Date.now() - t0;
console.log(`[PacketStore] Loaded ${this.packets.length} packets in ${elapsed}ms (${Math.round(this.packets.length * this.estPacketBytes / 1024 / 1024)}MB est)`);
return this;
}
/** Index a packet into all lookup maps */
_index(pkt) {
this.byId.set(pkt.id, pkt);
if (pkt.hash) {
if (!this.byHash.has(pkt.hash)) this.byHash.set(pkt.hash, []);
this.byHash.get(pkt.hash).push(pkt);
/** Index a legacy packet row (old flat structure) — builds transmission + observation */
_indexLegacy(pkt) {
let tx = this.byTransmission.get(pkt.hash);
if (!tx) {
tx = {
id: pkt.id,
raw_hex: pkt.raw_hex,
hash: pkt.hash,
first_seen: pkt.timestamp,
timestamp: pkt.timestamp,
route_type: pkt.route_type,
payload_type: pkt.payload_type,
decoded_json: pkt.decoded_json,
observations: [],
observation_count: 0,
observer_id: pkt.observer_id,
observer_name: pkt.observer_name,
snr: pkt.snr,
rssi: pkt.rssi,
path_json: pkt.path_json,
direction: pkt.direction,
};
this.byTransmission.set(pkt.hash, tx);
this.byHash.set(pkt.hash, tx);
this.packets.push(tx);
this._indexByNode(tx);
}
if (pkt.timestamp < tx.first_seen) {
tx.first_seen = pkt.timestamp;
tx.timestamp = pkt.timestamp;
}
const obs = {
id: pkt.id,
observer_id: pkt.observer_id,
observer_name: pkt.observer_name,
direction: pkt.direction,
snr: pkt.snr,
rssi: pkt.rssi,
score: pkt.score,
path_json: pkt.path_json,
timestamp: pkt.timestamp,
hash: pkt.hash,
raw_hex: pkt.raw_hex,
payload_type: pkt.payload_type,
decoded_json: pkt.decoded_json,
route_type: pkt.route_type,
};
tx.observations.push(obs);
tx.observation_count++;
this.byId.set(pkt.id, obs);
if (pkt.observer_id) {
if (!this.byObserver.has(pkt.observer_id)) this.byObserver.set(pkt.observer_id, []);
this.byObserver.get(pkt.observer_id).push(pkt);
this.byObserver.get(pkt.observer_id).push(obs);
}
// Index by node pubkeys mentioned in decoded_json
this._indexByNode(pkt);
this.stats.totalObservations++;
}
/** Extract node pubkeys/names from decoded_json and index */
_indexByNode(pkt) {
if (!pkt.decoded_json) return;
/** Extract node pubkeys from decoded_json and index transmission in byNode */
_indexByNode(tx) {
if (!tx.decoded_json) return;
try {
const decoded = JSON.parse(pkt.decoded_json);
const decoded = JSON.parse(tx.decoded_json);
const keys = new Set();
if (decoded.pubKey) keys.add(decoded.pubKey);
if (decoded.destPubKey) keys.add(decoded.destPubKey);
if (decoded.srcPubKey) keys.add(decoded.srcPubKey);
for (const k of keys) {
if (!this._nodeHashIndex.has(k)) this._nodeHashIndex.set(k, new Set());
if (this._nodeHashIndex.get(k).has(tx.hash)) continue; // already indexed
this._nodeHashIndex.get(k).add(tx.hash);
if (!this.byNode.has(k)) this.byNode.set(k, []);
this.byNode.get(k).push(pkt);
this.byNode.get(k).push(tx);
}
} catch {}
}
/** Remove oldest transmissions when over memory limit */
_evict() {
while (this.packets.length > this.maxPackets) {
const old = this.packets.pop();
this.byHash.delete(old.hash);
this.byTransmission.delete(old.hash);
// Remove observations from byId and byObserver
for (const obs of old.observations) {
this.byId.delete(obs.id);
if (obs.observer_id && this.byObserver.has(obs.observer_id)) {
const arr = this.byObserver.get(obs.observer_id).filter(o => o.id !== obs.id);
if (arr.length) this.byObserver.set(obs.observer_id, arr); else this.byObserver.delete(obs.observer_id);
}
}
// Skip node index cleanup (expensive, low value)
this.stats.evicted++;
}
}
/** Insert a new packet (to both memory and SQLite) */
insert(packetData) {
const id = this.dbModule.insertPacket(packetData);
const row = this.dbModule.getPacket(id);
if (row && !this.sqliteOnly) {
// Update or create transmission in memory
let tx = this.byTransmission.get(row.hash);
if (!tx) {
tx = {
id: row.id,
raw_hex: row.raw_hex,
hash: row.hash,
first_seen: row.timestamp,
timestamp: row.timestamp,
route_type: row.route_type,
payload_type: row.payload_type,
decoded_json: row.decoded_json,
observations: [],
observation_count: 0,
observer_id: row.observer_id,
observer_name: row.observer_name,
snr: row.snr,
rssi: row.rssi,
path_json: row.path_json,
direction: row.direction,
};
this.byTransmission.set(row.hash, tx);
this.byHash.set(row.hash, tx);
this.packets.unshift(tx); // newest first
this._indexByNode(tx);
} else {
// Update first_seen if earlier
if (row.timestamp < tx.first_seen) {
tx.first_seen = row.timestamp;
tx.timestamp = row.timestamp;
}
}
// Add observation
const obs = {
id: row.id,
observer_id: row.observer_id,
observer_name: row.observer_name,
direction: row.direction,
snr: row.snr,
rssi: row.rssi,
score: row.score,
path_json: row.path_json,
timestamp: row.timestamp,
hash: row.hash,
raw_hex: row.raw_hex,
payload_type: row.payload_type,
decoded_json: row.decoded_json,
route_type: row.route_type,
};
tx.observations.push(obs);
tx.observation_count++;
// Update transmission's display fields if this is first observation
if (tx.observations.length === 1) {
tx.observer_id = obs.observer_id;
tx.observer_name = obs.observer_name;
tx.snr = obs.snr;
tx.rssi = obs.rssi;
tx.path_json = obs.path_json;
}
this.byId.set(obs.id, obs);
if (obs.observer_id) {
if (!this.byObserver.has(obs.observer_id)) this.byObserver.set(obs.observer_id, []);
this.byObserver.get(obs.observer_id).push(obs);
}
this.stats.totalObservations++;
this._evict();
this.stats.inserts++;
}
return id;
}
/**
* Find ALL packets referencing a node — by pubkey index + name + pubkey text search.
* Single source of truth for "get packets for node X".
* Returns unique transmissions (deduped).
* @param {string} nodeIdOrName - pubkey or friendly name
* @param {Array} [fromPackets] - packet array to filter (defaults to this.packets)
* @returns {{ packets: Array, pubkey: string, nodeName: string }}
@@ -104,49 +360,24 @@ class PacketStore {
if (row) { pubkey = row.public_key; nodeName = row.name || nodeIdOrName; }
} catch {}
// Combine: index hits + text search by both name and pubkey
// Combine: index hits + text search
const indexed = this.byNode.get(pubkey);
const idSet = indexed ? new Set(indexed.map(p => p.id)) : new Set();
const hashSet = indexed ? new Set(indexed.map(t => t.hash)) : new Set();
const source = fromPackets || this.packets;
const packets = source.filter(p =>
idSet.has(p.id) ||
(p.decoded_json && (p.decoded_json.includes(nodeName) || p.decoded_json.includes(pubkey)))
const packets = source.filter(t =>
hashSet.has(t.hash) ||
(t.decoded_json && (t.decoded_json.includes(nodeName) || t.decoded_json.includes(pubkey)))
);
return { packets, pubkey, nodeName };
}
/** Remove oldest packets when over memory limit */
_evict() {
while (this.packets.length > this.maxPackets) {
const old = this.packets.pop();
this.byId.delete(old.id);
// Remove from hash index
if (old.hash && this.byHash.has(old.hash)) {
const arr = this.byHash.get(old.hash).filter(p => p.id !== old.id);
if (arr.length) this.byHash.set(old.hash, arr); else this.byHash.delete(old.hash);
}
// Remove from observer index
if (old.observer_id && this.byObserver.has(old.observer_id)) {
const arr = this.byObserver.get(old.observer_id).filter(p => p.id !== old.id);
if (arr.length) this.byObserver.set(old.observer_id, arr); else this.byObserver.delete(old.observer_id);
}
// Skip node index cleanup for eviction (expensive, low value)
this.stats.evicted++;
}
}
/** Insert a new packet (to both memory and SQLite) */
insert(packetData) {
const id = this.dbModule.insertPacket(packetData);
const row = this.dbModule.getPacket(id);
if (row) {
this.packets.unshift(row); // newest first
this._index(row);
this._evict();
this.stats.inserts++;
}
return id;
/** Count transmissions and observations for a node */
countForNode(pubkey) {
const txs = this.byNode.get(pubkey) || [];
let observations = 0;
for (const tx of txs) observations += tx.observation_count;
return { transmissions: txs.length, observations };
}
/** Query packets with filters — all from memory (or SQLite in fallback mode) */
@@ -159,9 +390,11 @@ class PacketStore {
// Use indexes for single-key filters when possible
if (hash && !type && !route && !region && !observer && !since && !until && !node) {
results = this.byHash.get(hash) || [];
const tx = this.byHash.get(hash);
results = tx ? [tx] : [];
} else if (observer && !type && !route && !region && !hash && !since && !until && !node) {
results = this.byObserver.get(observer) || [];
// For observer filter, find unique transmissions where any observation matches
results = this._transmissionsForObserver(observer);
} else if (node && !type && !route && !region && !observer && !hash && !since && !until) {
results = this.findPacketsForNode(node).packets;
} else {
@@ -174,18 +407,22 @@ class PacketStore {
const r = Number(route);
results = results.filter(p => p.route_type === r);
}
if (observer) results = results.filter(p => p.observer_id === observer);
if (hash) results = results.filter(p => p.hash === hash);
if (observer) results = this._transmissionsForObserver(observer, results);
if (hash) {
const tx = this.byHash.get(hash);
results = tx ? results.filter(p => p.hash === hash) : [];
}
if (since) results = results.filter(p => p.timestamp > since);
if (until) results = results.filter(p => p.timestamp < until);
if (region) {
// Need to look up observers for this region
const regionObservers = new Set();
try {
const obs = this.db.prepare('SELECT id FROM observers WHERE iata = ?').all(region);
obs.forEach(o => regionObservers.add(o.id));
} catch {}
results = results.filter(p => regionObservers.has(p.observer_id));
results = results.filter(p =>
p.observations.some(o => regionObservers.has(o.observer_id))
);
}
if (node) {
results = this.findPacketsForNode(node, results).packets;
@@ -209,52 +446,52 @@ class PacketStore {
return { packets: paginated, total };
}
/** Query with groupByHash — aggregate packets by content hash */
/** Find unique transmissions that have at least one observation from given observer */
_transmissionsForObserver(observerId, fromTransmissions) {
if (fromTransmissions) {
return fromTransmissions.filter(tx =>
tx.observations.some(o => o.observer_id === observerId)
);
}
// Use byObserver index: get observations, then unique transmissions
const obs = this.byObserver.get(observerId) || [];
const seen = new Set();
const result = [];
for (const o of obs) {
if (!seen.has(o.hash)) {
seen.add(o.hash);
const tx = this.byTransmission.get(o.hash);
if (tx) result.push(tx);
}
}
return result;
}
/** Query with groupByHash — now trivial since packets ARE transmissions */
queryGrouped({ limit = 50, offset = 0, type, route, region, observer, hash, since, until, node } = {}) {
this.stats.queries++;
if (this.sqliteOnly) return this._queryGroupedSQLite({ limit, offset, type, route, region, observer, hash, since, until, node });
// Get filtered results first
// Get filtered transmissions
const { packets: filtered, total: filteredTotal } = this.query({
limit: 999999, offset: 0, type, route, region, observer, hash, since, until, node
});
// Group by hash
const groups = new Map();
for (const p of filtered) {
const h = p.hash || p.id.toString();
if (!groups.has(h)) {
groups.set(h, {
hash: p.hash,
observer_count: new Set(),
count: 0,
latest: p.timestamp,
observer_id: p.observer_id,
observer_name: p.observer_name,
path_json: p.path_json,
payload_type: p.payload_type,
raw_hex: p.raw_hex,
decoded_json: p.decoded_json,
});
}
const g = groups.get(h);
g.count++;
if (p.observer_id) g.observer_count.add(p.observer_id);
if (p.timestamp > g.latest) {
g.latest = p.timestamp;
}
// Keep longest path
if (p.path_json && (!g.path_json || p.path_json.length > g.path_json.length)) {
g.path_json = p.path_json;
g.raw_hex = p.raw_hex;
}
}
// Sort by latest DESC, paginate
const sorted = [...groups.values()]
.map(g => ({ ...g, observer_count: g.observer_count.size }))
.sort((a, b) => b.latest.localeCompare(a.latest));
// Already grouped by hash — just format for backward compat
const sorted = filtered.map(tx => ({
hash: tx.hash,
count: tx.observation_count,
observer_count: new Set(tx.observations.map(o => o.observer_id).filter(Boolean)).size,
latest: tx.observations.length ? tx.observations.reduce((max, o) => o.timestamp > max ? o.timestamp : max, tx.observations[0].timestamp) : tx.timestamp,
observer_id: tx.observer_id,
observer_name: tx.observer_name,
path_json: tx.path_json,
payload_type: tx.payload_type,
raw_hex: tx.raw_hex,
decoded_json: tx.decoded_json,
observation_count: tx.observation_count,
})).sort((a, b) => b.latest.localeCompare(a.latest));
const total = sorted.length;
const paginated = sorted.slice(Number(offset), Number(offset) + Number(limit));
@@ -274,25 +511,26 @@ class PacketStore {
return results.reverse();
}
/** Get a single packet by ID */
/** Get a single packet by ID — checks observation IDs first (backward compat) */
getById(id) {
if (this.sqliteOnly) return this.db.prepare('SELECT * FROM packets WHERE id = ?').get(id) || null;
return this.byId.get(id) || null;
}
/** Get all siblings of a packet (same hash) */
/** Get all siblings of a packet (same hash) — returns observations array */
getSiblings(hash) {
if (this.sqliteOnly) return this.db.prepare('SELECT * FROM packets WHERE hash = ? ORDER BY timestamp DESC').all(hash);
return this.byHash.get(hash) || [];
const tx = this.byTransmission.get(hash);
return tx ? tx.observations : [];
}
/** Get all packets (raw array reference — do not mutate) */
/** Get all transmissions (backward compat — returns packets array) */
all() {
if (this.sqliteOnly) return this.db.prepare('SELECT * FROM packets ORDER BY timestamp DESC').all();
return this.packets;
}
/** Get all packets matching a filter function */
/** Get all transmissions matching a filter function */
filter(fn) {
if (this.sqliteOnly) return this.db.prepare('SELECT * FROM packets ORDER BY timestamp DESC').all().filter(fn);
return this.packets.filter(fn);
@@ -311,6 +549,7 @@ class PacketStore {
byHash: this.byHash.size,
byObserver: this.byObserver.size,
byNode: this.byNode.size,
byTransmission: this.byTransmission.size,
}
};
}

View File

@@ -150,13 +150,13 @@
el.innerHTML = `
<div class="stats-grid">
<div class="stat-card">
<div class="stat-value">${(rf.totalAllPackets || rf.totalPackets).toLocaleString()}</div>
<div class="stat-label">Total Packets</div>
<div class="stat-value">${(rf.totalTransmissions || rf.totalAllPackets || rf.totalPackets).toLocaleString()}</div>
<div class="stat-label">Total Transmissions</div>
<div class="stat-spark">${sparkSvg(rf.packetsPerHour.map(h=>h.count), 'var(--accent)')}</div>
</div>
<div class="stat-card">
<div class="stat-value">${rf.totalPackets.toLocaleString()}</div>
<div class="stat-label">With Signal Data</div>
<div class="stat-label">Observations with Signal</div>
</div>
<div class="stat-card">
<div class="stat-value">${topo.uniqueNodes}</div>
@@ -1167,7 +1167,7 @@
const enriched = nodes.filter(n => healthMap[n.public_key]).map(n => ({ ...n, health: { stats: healthMap[n.public_key].stats, observers: healthMap[n.public_key].observers } }));
// Compute rankings
const byPackets = [...enriched].sort((a, b) => (b.health.stats.totalPackets || 0) - (a.health.stats.totalPackets || 0));
const byPackets = [...enriched].sort((a, b) => (b.health.stats.totalTransmissions || b.health.stats.totalPackets || 0) - (a.health.stats.totalTransmissions || a.health.stats.totalPackets || 0));
const bySnr = [...enriched].filter(n => n.health.stats.avgSnr != null).sort((a, b) => b.health.stats.avgSnr - a.health.stats.avgSnr);
const byObservers = [...enriched].sort((a, b) => (b.health.observers?.length || 0) - (a.health.observers?.length || 0));
const byRecent = [...enriched].filter(n => n.health.stats.lastHeard).sort((a, b) => new Date(b.health.stats.lastHeard) - new Date(a.health.stats.lastHeard));
@@ -1223,7 +1223,7 @@
return `<tr>
<td>${nodeLink(n)}</td>
<td><span class="badge" style="background:${(ROLE_COLORS[n.role]||'#6b7280')}20;color:${ROLE_COLORS[n.role]||'#6b7280'}">${n.role}</span></td>
<td>${s.totalPackets || 0}</td>
<td>${s.totalTransmissions || s.totalPackets || 0}</td>
<td>${s.avgSnr != null ? s.avgSnr.toFixed(1) + ' dB' : '—'}</td>
<td>${n.health.observers?.length || 0}</td>
<td>${s.lastHeard ? timeAgo(s.lastHeard) : '—'}</td>
@@ -1240,7 +1240,7 @@
<td>${i + 1}</td>
<td>${nodeLink(n)}${claimedBadge(n)}</td>
<td><span class="badge" style="background:${(ROLE_COLORS[n.role]||'#6b7280')}20;color:${ROLE_COLORS[n.role]||'#6b7280'}">${n.role}</span></td>
<td>${n.health.stats.totalPackets || 0}</td>
<td>${n.health.stats.totalTransmissions || n.health.stats.totalPackets || 0}</td>
<td>${n.health.stats.packetsToday || 0}</td>
<td><a href="#/nodes/${encodeURIComponent(n.public_key)}/analytics" class="analytics-link">📊</a></td>
</tr>`).join('')}

View File

@@ -373,7 +373,7 @@
const el = document.getElementById('homeStats');
if (!el) return;
el.innerHTML = `
<div class="home-stat"><div class="val">${s.totalPackets ?? '—'}</div><div class="lbl">Packets</div></div>
<div class="home-stat"><div class="val">${s.totalTransmissions ?? s.totalPackets ?? '—'}</div><div class="lbl">Transmissions</div></div>
<div class="home-stat"><div class="val">${s.totalNodes ?? '—'}</div><div class="lbl">Nodes</div></div>
<div class="home-stat"><div class="val">${s.totalObservers ?? '—'}</div><div class="lbl">Observers</div></div>
<div class="home-stat"><div class="val">${s.packetsLast24h ?? '—'}</div><div class="lbl">Last 24h</div></div>

View File

@@ -22,7 +22,7 @@
<meta name="twitter:title" content="MeshCore Analyzer">
<meta name="twitter:description" content="Real-time MeshCore LoRa mesh network analyzer — live packet visualization, node tracking, channel decryption, and route analysis.">
<meta name="twitter:image" content="https://raw.githubusercontent.com/Kpa-clawbot/meshcore-analyzer/master/public/og-image.png">
<link rel="stylesheet" href="style.css?v=1773998477">
<link rel="stylesheet" href="style.css?v=1774042199">
<link rel="stylesheet" href="home.css">
<link rel="stylesheet" href="live.css?v=1774034490">
<link rel="stylesheet" href="https://unpkg.com/leaflet@1.9.4/dist/leaflet.css"
@@ -81,17 +81,17 @@
<script src="vendor/qrcode.js"></script>
<script src="roles.js?v=1774028201"></script>
<script src="app.js?v=1774034748"></script>
<script src="home.js?v=1774028201"></script>
<script src="packets.js?v=1774023016"></script>
<script src="home.js?v=1774042199"></script>
<script src="packets.js?v=1774044174"></script>
<script src="map.js?v=1774028201" onerror="console.error('Failed to load:', this.src)"></script>
<script src="channels.js?v=1774028201" onerror="console.error('Failed to load:', this.src)"></script>
<script src="nodes.js?v=1774028201" onerror="console.error('Failed to load:', this.src)"></script>
<script src="nodes.js?v=1774042199" onerror="console.error('Failed to load:', this.src)"></script>
<script src="traces.js?v=1773972187" onerror="console.error('Failed to load:', this.src)"></script>
<script src="analytics.js?v=1774028201" onerror="console.error('Failed to load:', this.src)"></script>
<script src="live.js?v=1774034600" onerror="console.error('Failed to load:', this.src)"></script>
<script src="analytics.js?v=1774042199" onerror="console.error('Failed to load:', this.src)"></script>
<script src="live.js?v=1774046040" onerror="console.error('Failed to load:', this.src)"></script>
<script src="observers.js?v=1774018095" onerror="console.error('Failed to load:', this.src)"></script>
<script src="observer-detail.js?v=1774028201" onerror="console.error('Failed to load:', this.src)"></script>
<script src="node-analytics.js?v=1774028201" onerror="console.error('Failed to load:', this.src)"></script>
<script src="node-analytics.js?v=1774042199" onerror="console.error('Failed to load:', this.src)"></script>
<script src="perf.js?v=1773985649" onerror="console.error('Failed to load:', this.src)"></script>
</body>
</html>

View File

@@ -1020,13 +1020,13 @@
${hasLoc ? `<tr><td style="color:var(--text-muted);padding:4px 8px 4px 0;">Location</td><td>${n.lat.toFixed(5)}, ${n.lon.toFixed(5)}</td></tr>` : ''}
${stats.avgSnr != null ? `<tr><td style="color:var(--text-muted);padding:4px 8px 4px 0;">Avg SNR</td><td>${stats.avgSnr.toFixed(1)} dB</td></tr>` : ''}
${stats.avgHops != null ? `<tr><td style="color:var(--text-muted);padding:4px 8px 4px 0;">Avg Hops</td><td>${stats.avgHops.toFixed(1)}</td></tr>` : ''}
${stats.totalPackets ? `<tr><td style="color:var(--text-muted);padding:4px 8px 4px 0;">Total Packets</td><td>${stats.totalPackets}</td></tr>` : ''}
${stats.totalTransmissions || stats.totalPackets ? `<tr><td style="color:var(--text-muted);padding:4px 8px 4px 0;">Total Packets</td><td>${stats.totalTransmissions || stats.totalPackets}</td></tr>` : ''}
</table>`;
if (observers.length) {
html += `<h4 style="font-size:12px;margin:12px 0 6px;color:var(--text-muted);">Heard By</h4>
<div style="font-size:11px;">` +
observers.map(o => `<div style="padding:2px 0;">${escapeHtml(o.observer_name || o.observer_id.slice(0, 12))}${o.count} pkts</div>`).join('') +
observers.map(o => `<div style="padding:2px 0;"><a href="#/observers/${encodeURIComponent(o.observer_id)}" style="color:var(--accent);text-decoration:none;">${escapeHtml(o.observer_name || o.observer_id.slice(0, 12))}</a>${o.packetCount || o.count || 0} pkts</div>`).join('') +
'</div>';
}
@@ -1034,14 +1034,14 @@
html += `<h4 style="font-size:12px;margin:12px 0 6px;color:var(--text-muted);">Recent Packets</h4>
<div style="font-size:11px;max-height:200px;overflow-y:auto;">` +
recent.slice(0, 10).map(p => `<div style="padding:2px 0;display:flex;justify-content:space-between;">
<span>${escapeHtml(p.payload_type || '?')}</span>
<a href="#/packets/${encodeURIComponent(p.hash || '')}" style="color:var(--accent);text-decoration:none;">${escapeHtml(p.payload_type || '?')}${p.observation_count > 1 ? ' <span class="badge badge-obs" style="font-size:9px">👁 ' + p.observation_count + '</span>' : ''}</a>
<span style="color:var(--text-muted)">${p.timestamp ? timeAgo(p.timestamp) : '—'}</span>
</div>`).join('') +
'</div>';
}
html += `<div style="margin-top:12px;display:flex;gap:8px;">
<a href="#/nodes?selected=${encodeURIComponent(n.public_key)}" style="font-size:12px;color:var(--accent);">Full Detail →</a>
<a href="#/nodes/${encodeURIComponent(n.public_key)}" style="font-size:12px;color:var(--accent);">Full Detail →</a>
<a href="#/nodes/${encodeURIComponent(n.public_key)}/analytics" style="font-size:12px;color:var(--accent);">📊 Analytics</a>
</div></div>`;
@@ -1474,6 +1474,7 @@
const text = payload.text || payload.name || '';
const preview = text ? ' ' + (text.length > 35 ? text.slice(0, 35) + '…' : text) : '';
const hopStr = hops.length ? `<span class="feed-hops">${hops.length}⇢</span>` : '';
const obsBadge = pkt.observation_count > 1 ? `<span class="badge badge-obs" style="font-size:10px;margin-left:4px">👁 ${pkt.observation_count}</span>` : '';
const item = document.createElement('div');
item.className = 'live-feed-item live-feed-enter';
@@ -1483,7 +1484,7 @@
item.innerHTML = `
<span class="feed-icon" style="color:${color}">${icon}</span>
<span class="feed-type" style="color:${color}">${typeName}</span>
${hopStr}
${hopStr}${obsBadge}
<span class="feed-text">${escapeHtml(preview)}</span>
<span class="feed-time">${new Date(pkt._ts || Date.now()).toLocaleTimeString([], {hour:'2-digit',minute:'2-digit',second:'2-digit'})}</span>
`;

View File

@@ -55,7 +55,7 @@
<div style="margin-bottom:12px">
<a href="#/nodes/${encodeURIComponent(n.public_key)}" style="color:var(--accent);text-decoration:none;font-size:12px">← Back to ${nodeName}</a>
<h2 style="margin:4px 0 2px;font-size:18px">📊 ${nodeName} — Analytics</h2>
<div style="color:var(--text-muted);font-size:11px">${n.role || 'Unknown role'} · ${s.totalPackets} packets in ${days}d window</div>
<div style="color:var(--text-muted);font-size:11px">${n.role || 'Unknown role'} · ${s.totalTransmissions || s.totalPackets} packets in ${days}d window</div>
</div>
<div class="analytics-time-range" id="timeRangeBtns">

View File

@@ -128,7 +128,7 @@
<dl class="detail-meta">
<dt>Last Heard</dt><dd>${lastHeard ? timeAgo(lastHeard) : (n.last_seen ? timeAgo(n.last_seen) : '—')}</dd>
<dt>First Seen</dt><dd>${n.first_seen ? new Date(n.first_seen).toLocaleString() : '—'}</dd>
<dt>Total Packets</dt><dd>${stats.totalPackets || n.advert_count || 0}</dd>
<dt>Total Packets</dt><dd>${stats.totalTransmissions || stats.totalPackets || n.advert_count || 0}${stats.totalObservations && stats.totalObservations !== (stats.totalTransmissions || stats.totalPackets) ? ' <span class="text-muted" style="font-size:0.85em">(seen ' + stats.totalObservations + '×)</span>' : ''}</dd>
<dt>Packets Today</dt><dd>${stats.packetsToday || 0}</dd>
${stats.avgSnr != null ? `<dt>Avg SNR</dt><dd>${stats.avgSnr.toFixed(1)} dB</dd>` : ''}
${stats.avgHops ? `<dt>Avg Hops</dt><dd>${stats.avgHops}</dd>` : ''}
@@ -161,9 +161,10 @@
const obs = p.observer_name || p.observer_id;
const snr = p.snr != null ? ` · SNR ${p.snr}dB` : '';
const rssi = p.rssi != null ? ` · RSSI ${p.rssi}dBm` : '';
const obsBadge = p.observation_count > 1 ? ` <span class="badge badge-obs" title="Seen ${p.observation_count} times">👁 ${p.observation_count}</span>` : '';
return `<div class="node-activity-item">
<span class="node-activity-time">${timeAgo(p.timestamp)}</span>
<span>${typeLabel}${detail}${obs ? ' via ' + escapeHtml(obs) : ''}${snr}${rssi}</span>
<span>${typeLabel}${detail}${obsBadge}${obs ? ' via ' + escapeHtml(obs) : ''}${snr}${rssi}</span>
<a href="#/packets/id/${p.id}" class="ch-analyze-link" style="margin-left:8px;font-size:0.8em">Analyze →</a>
</div>`;
}).join('') : '<div class="text-muted">No recent packets</div>'}
@@ -426,7 +427,7 @@
const role = (n.role || '').toLowerCase();
const { degradedMs, silentMs } = getHealthThresholds(role);
const statusLabel = statusAge < degradedMs ? '🟢 Active' : statusAge < silentMs ? '🟡 Degraded' : '🔴 Silent';
const totalPackets = stats.totalPackets || n.advert_count || 0;
const totalPackets = stats.totalTransmissions || stats.totalPackets || n.advert_count || 0;
panel.innerHTML = `
<div class="node-detail">
@@ -480,6 +481,7 @@
<span class="advert-dot" style="background:${roleColor}"></span>
<div class="advert-info">
<strong>${timeAgo(a.timestamp)}</strong> ${icon} ${pType}${detail}
${a.observation_count > 1 ? ' <span class="badge badge-obs">👁 ' + a.observation_count + '</span>' : ''}
${obs ? ' via ' + escapeHtml(obs) : ''}
${a.snr != null ? ` · SNR ${a.snr}dB` : ''}${a.rssi != null ? ` · RSSI ${a.rssi}dBm` : ''}
<br><a href="#/packets/id/${a.id}" class="ch-analyze-link">Analyze </a>

View File

@@ -221,6 +221,7 @@
const existing = packets.find(g => g.hash === h);
if (existing) {
existing.count = (existing.count || 1) + 1;
existing.observation_count = (existing.observation_count || 1) + 1;
existing.latest = p.timestamp > existing.latest ? p.timestamp : existing.latest;
// Track unique observers
if (p.observer_id && p.observer_id !== existing.observer_id) {
@@ -630,7 +631,7 @@
<td class="col-type">${p.payload_type != null ? `<span class="badge badge-${groupTypeClass}">${groupTypeName}</span>` : '—'}</td>
<td class="col-observer">${isSingle ? truncate(obsName(p.observer_id), 16) : truncate(obsName(p.observer_id), 10) + (p.observer_count > 1 ? ' +' + (p.observer_count - 1) : '')}</td>
<td class="col-path"><span class="path-hops">${groupPathStr}</span></td>
<td class="col-rpt">${isSingle ? '' : p.count}</td>
<td class="col-rpt">${p.observation_count > 1 ? '<span class="badge badge-obs" title="Seen ' + p.observation_count + ' times">👁 ' + p.observation_count + '</span>' : (isSingle ? '' : p.count)}</td>
<td class="col-details">${getDetailPreview((() => { try { return JSON.parse(p.decoded_json || '{}'); } catch { return {}; } })())}</td>
</tr>`;
// Child rows (loaded async when expanded)
@@ -1104,11 +1105,12 @@
renderTableRows();
return;
}
// Load children for this hash
// Load children (observations) for this hash
try {
const data = await api(`/packets?hash=${hash}&limit=20`);
const data = await api(`/packets?hash=${hash}&limit=1&expand=observations`);
const pkt = (data.packets || [])[0];
const group = packets.find(p => p.hash === hash);
if (group) group._children = data.packets || [];
if (group && pkt) group._children = (pkt.observations || []).map(o => ({...pkt, ...o, _isObservation: true}));
// Resolve any new hops from children
const childHops = new Set();
for (const c of (group?._children || [])) {

View File

@@ -263,6 +263,11 @@ a:focus-visible, button:focus-visible, input:focus-visible, select:focus-visible
font-size: 10px; font-weight: 700; font-family: var(--mono);
background: var(--nav-bg); color: #fff; letter-spacing: .5px;
}
.badge-obs {
display: inline-block; padding: 1px 6px; border-radius: 10px;
font-size: 10px; font-weight: 600;
background: #ede9fe; color: #6d28d9;
}
/* === Monospace === */
.mono { font-family: var(--mono); font-size: 12px; }

144
scripts/migrate-dedup.js Normal file
View File

@@ -0,0 +1,144 @@
#!/usr/bin/env node
/**
* Milestone 1: Packet Dedup Schema Migration
*
* Creates `transmissions` and `observations` tables from the existing `packets` table.
* Idempotent — drops and recreates new tables on each run.
* Does NOT touch the original `packets` table.
*
* Usage: node scripts/migrate-dedup.js <path-to-meshcore.db>
*/
const Database = require('better-sqlite3');
const path = require('path');
const dbPath = process.argv[2];
if (!dbPath) {
console.error('Usage: node scripts/migrate-dedup.js <path-to-meshcore.db>');
process.exit(1);
}
const start = Date.now();
const db = new Database(dbPath);
db.pragma('journal_mode = WAL');
db.pragma('foreign_keys = ON');
// --- Drop existing new tables (idempotent) ---
console.log('Dropping existing transmissions/observations tables if they exist...');
db.exec('DROP TABLE IF EXISTS observations');
db.exec('DROP TABLE IF EXISTS transmissions');
// --- Create new tables ---
console.log('Creating transmissions and observations tables...');
db.exec(`
CREATE TABLE transmissions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
raw_hex TEXT NOT NULL,
hash TEXT NOT NULL UNIQUE,
first_seen TEXT NOT NULL,
route_type INTEGER,
payload_type INTEGER,
payload_version INTEGER,
decoded_json TEXT,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE observations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
transmission_id INTEGER NOT NULL REFERENCES transmissions(id),
hash TEXT NOT NULL,
observer_id TEXT,
observer_name TEXT,
direction TEXT,
snr REAL,
rssi REAL,
score INTEGER,
path_json TEXT,
timestamp TEXT NOT NULL,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX idx_transmissions_hash ON transmissions(hash);
CREATE INDEX idx_transmissions_first_seen ON transmissions(first_seen);
CREATE INDEX idx_transmissions_payload_type ON transmissions(payload_type);
CREATE INDEX idx_observations_hash ON observations(hash);
CREATE INDEX idx_observations_transmission_id ON observations(transmission_id);
CREATE INDEX idx_observations_observer_id ON observations(observer_id);
CREATE INDEX idx_observations_timestamp ON observations(timestamp);
`);
// --- Read all packets ordered by timestamp ---
console.log('Reading packets...');
const packets = db.prepare('SELECT * FROM packets ORDER BY timestamp ASC').all();
const totalPackets = packets.length;
console.log(`Total packets: ${totalPackets}`);
// --- Group by hash and migrate ---
const insertTransmission = db.prepare(`
INSERT OR IGNORE INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, payload_version, decoded_json)
VALUES (?, ?, ?, ?, ?, ?, ?)
`);
const insertObservation = db.prepare(`
INSERT INTO observations (transmission_id, hash, observer_id, observer_name, direction, snr, rssi, score, path_json, timestamp)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`);
const hashToTransmissionId = new Map();
let transmissionCount = 0;
const lookupTransmission = db.prepare('SELECT id FROM transmissions WHERE hash = ?');
const migrate = db.transaction(() => {
for (const pkt of packets) {
let txId = hashToTransmissionId.get(pkt.hash);
if (txId === undefined) {
const result = insertTransmission.run(
pkt.raw_hex, pkt.hash, pkt.timestamp,
pkt.route_type, pkt.payload_type, pkt.payload_version, pkt.decoded_json
);
if (result.changes > 0) {
txId = result.lastInsertRowid;
} else {
// Already inserted by dual-write, look up existing
txId = lookupTransmission.get(pkt.hash).id;
}
hashToTransmissionId.set(pkt.hash, txId);
transmissionCount++;
}
insertObservation.run(
txId, pkt.hash, pkt.observer_id, pkt.observer_name, pkt.direction,
pkt.snr, pkt.rssi, pkt.score, pkt.path_json, pkt.timestamp
);
}
});
migrate();
// --- Verify ---
const obsCount = db.prepare('SELECT COUNT(*) as c FROM observations').get().c;
const txCount = db.prepare('SELECT COUNT(*) as c FROM transmissions').get().c;
const distinctHash = db.prepare('SELECT COUNT(DISTINCT hash) as c FROM packets').get().c;
const elapsed = ((Date.now() - start) / 1000).toFixed(2);
console.log('\n=== Migration Stats ===');
console.log(`Total packets (source): ${totalPackets}`);
console.log(`Unique transmissions created: ${transmissionCount}`);
console.log(`Observations created: ${obsCount}`);
console.log(`Dedup ratio: ${(totalPackets / transmissionCount).toFixed(2)}x`);
console.log(`Time taken: ${elapsed}s`);
console.log('\n=== Verification ===');
const obsOk = obsCount === totalPackets;
const txOk = txCount === distinctHash;
console.log(`observations (${obsCount}) = packets (${totalPackets}): ${obsOk ? 'PASS ✓' : 'FAIL ✗'}`);
console.log(`transmissions (${txCount}) = distinct hashes (${distinctHash}): ${txOk ? 'PASS ✓' : 'FAIL ✗'}`);
if (!obsOk || !txOk) {
console.error('\nVerification FAILED!');
process.exit(1);
}
console.log('\nMigration complete!');
db.close();

View File

@@ -514,7 +514,7 @@ for (const source of mqttSources) {
const observerId = parts[2] || null;
const region = parts[1] || null;
const packetId = pktStore.insert({
const pktData = {
raw_hex: msg.raw,
timestamp: now,
observer_id: observerId,
@@ -527,7 +527,9 @@ for (const source of mqttSources) {
payload_version: decoded.header.payloadVersion,
path_json: JSON.stringify(decoded.path.hops),
decoded_json: JSON.stringify(decoded.payload),
});
};
const packetId = pktStore.insert(pktData);
try { db.insertTransmission(pktData); } catch (e) { console.error('[dual-write] transmission insert error:', e.message); }
if (decoded.path.hops.length > 0) {
db.insertPath(packetId, decoded.path.hops);
@@ -560,7 +562,9 @@ for (const source of mqttSources) {
cache.debouncedInvalidateAll();
const fullPacket = pktStore.getById(packetId);
const broadcastData = { id: packetId, raw: msg.raw, decoded, snr: msg.SNR, rssi: msg.RSSI, hash: msg.hash, observer: observerId, packet: fullPacket };
const tx = pktStore.byTransmission.get(pktData.hash);
const observation_count = tx ? tx.observation_count : 1;
const broadcastData = { id: packetId, raw: msg.raw, decoded, snr: msg.SNR, rssi: msg.RSSI, hash: msg.hash, observer: observerId, packet: fullPacket, observation_count };
broadcast({ type: 'packet', data: broadcastData });
if (decoded.header.payloadTypeName === 'GRP_TXT') {
@@ -598,7 +602,7 @@ for (const source of mqttSources) {
const role = advert.role || (advert.flags?.repeater ? 'repeater' : advert.flags?.room ? 'room' : 'companion');
db.upsertNode({ public_key: pubKey, name, role, lat, lon, last_seen: now });
const packetId = pktStore.insert({
const advertPktData = {
raw_hex: null,
timestamp: now,
observer_id: 'companion',
@@ -611,7 +615,9 @@ for (const source of mqttSources) {
payload_version: 0,
path_json: JSON.stringify([]),
decoded_json: JSON.stringify(advert),
});
};
const packetId = pktStore.insert(advertPktData);
try { db.insertTransmission(advertPktData); } catch (e) { console.error('[dual-write] transmission insert error:', e.message); }
broadcast({ type: 'packet', data: { id: packetId, decoded: { header: { payloadTypeName: 'ADVERT' }, payload: advert } } });
}
return;
@@ -629,7 +635,7 @@ for (const source of mqttSources) {
const senderKey = `sender-${senderName.toLowerCase().replace(/[^a-z0-9]/g, '')}`;
db.upsertNode({ public_key: senderKey, name: senderName, role: 'companion', lat: null, lon: null, last_seen: now });
}
const packetId = pktStore.insert({
const chPktData = {
raw_hex: null,
timestamp: now,
observer_id: 'companion',
@@ -642,7 +648,9 @@ for (const source of mqttSources) {
payload_version: 0,
path_json: JSON.stringify([]),
decoded_json: JSON.stringify(channelMsg),
});
};
const packetId = pktStore.insert(chPktData);
try { db.insertTransmission(chPktData); } catch (e) { console.error('[dual-write] transmission insert error:', e.message); }
broadcast({ type: 'packet', data: { id: packetId, decoded: { header: { payloadTypeName: 'GRP_TXT' }, payload: channelMsg } } });
broadcast({ type: 'message', data: { id: packetId, decoded: { header: { payloadTypeName: 'GRP_TXT' }, payload: channelMsg } } });
return;
@@ -651,7 +659,7 @@ for (const source of mqttSources) {
// Handle direct messages
if (topic.startsWith('meshcore/message/direct/')) {
const dm = msg.payload || msg;
const packetId = pktStore.insert({
const dmPktData = {
raw_hex: null,
timestamp: dm.timestamp || now,
observer_id: 'companion',
@@ -663,7 +671,9 @@ for (const source of mqttSources) {
payload_version: 0,
path_json: JSON.stringify(dm.hops || []),
decoded_json: JSON.stringify(dm),
});
};
const packetId = pktStore.insert(dmPktData);
try { db.insertTransmission(dmPktData); } catch (e) { console.error('[dual-write] transmission insert error:', e.message); }
broadcast({ type: 'packet', data: { id: packetId, decoded: { header: { payloadTypeName: 'TXT_MSG' }, payload: dm } } });
return;
}
@@ -671,7 +681,7 @@ for (const source of mqttSources) {
// Handle traceroute
if (topic.startsWith('meshcore/traceroute/')) {
const trace = msg.payload || msg;
const packetId = pktStore.insert({
const tracePktData = {
raw_hex: null,
timestamp: now,
observer_id: 'companion',
@@ -683,7 +693,9 @@ for (const source of mqttSources) {
payload_version: 0,
path_json: JSON.stringify(trace.hops || trace.path || []),
decoded_json: JSON.stringify(trace),
});
};
const packetId = pktStore.insert(tracePktData);
try { db.insertTransmission(tracePktData); } catch (e) { console.error('[dual-write] transmission insert error:', e.message); }
broadcast({ type: 'packet', data: { id: packetId, decoded: { header: { payloadTypeName: 'TRACE' }, payload: trace } } });
return;
}
@@ -739,11 +751,23 @@ app.get('/api/packets', (req, res) => {
return res.json({ packets: paged, total, limit: Number(limit), offset: Number(offset) });
}
// groupByHash is now the default behavior (transmissions ARE grouped) — keep param for compat
if (groupByHash === 'true') {
return res.json(pktStore.queryGrouped({ limit, offset, type, route, region, observer, hash, since, until, node }));
}
res.json(pktStore.query({ limit, offset, type, route, region, observer, hash, since, until, node, order }));
const expand = req.query.expand;
const result = pktStore.query({ limit, offset, type, route, region, observer, hash, since, until, node, order });
// Strip observations[] from default response for bandwidth; include with ?expand=observations
if (expand !== 'observations') {
result.packets = result.packets.map(p => {
const { observations, ...rest } = p;
return rest;
});
}
res.json(result);
});
// Lightweight endpoint: just timestamps for timeline sparkline
@@ -774,7 +798,12 @@ app.get('/api/packets/:id', (req, res) => {
// Build byte breakdown
const breakdown = buildBreakdown(packet.raw_hex, decoded);
res.json({ packet, path: pathHops, breakdown });
// Include sibling observations for this transmission
const transmission = packet.hash ? pktStore.byTransmission.get(packet.hash) : null;
const siblingObservations = transmission ? transmission.observations : [];
const observation_count = transmission ? transmission.observation_count : 1;
res.json({ packet, path: pathHops, breakdown, observation_count, observations: siblingObservations });
});
function buildBreakdown(rawHex, decoded) {
@@ -860,7 +889,7 @@ app.post('/api/packets', (req, res) => {
const decoded = decoder.decodePacket(hex, channelKeys);
const now = new Date().toISOString();
const packetId = pktStore.insert({
const apiPktData = {
raw_hex: hex.toUpperCase(),
timestamp: now,
observer_id: observer || null,
@@ -872,7 +901,9 @@ app.post('/api/packets', (req, res) => {
payload_version: decoded.header.payloadVersion,
path_json: JSON.stringify(decoded.path.hops),
decoded_json: JSON.stringify(decoded.payload),
});
};
const packetId = pktStore.insert(apiPktData);
try { db.insertTransmission(apiPktData); } catch (e) { console.error('[dual-write] transmission insert error:', e.message); }
if (decoded.path.hops.length > 0) {
db.insertPath(packetId, decoded.path.hops);
@@ -954,10 +985,12 @@ app.get('/api/nodes/bulk-health', (req, res) => {
const results = [];
for (const node of nodes) {
const packets = pktStore.byNode.get(node.public_key) || [];
let totalPackets = packets.length, packetsToday = 0, snrSum = 0, snrCount = 0, lastHeard = null;
let packetsToday = 0, snrSum = 0, snrCount = 0, lastHeard = null;
const observers = {};
let totalObservations = 0;
for (const pkt of packets) {
totalObservations += pkt.observation_count || 1;
if (pkt.timestamp > todayISO) packetsToday++;
if (pkt.snr != null) { snrSum += pkt.snr; snrCount++; }
if (!lastHeard || pkt.timestamp > lastHeard) lastHeard = pkt.timestamp;
@@ -984,7 +1017,12 @@ app.get('/api/nodes/bulk-health', (req, res) => {
results.push({
public_key: node.public_key, name: node.name, role: node.role,
lat: node.lat, lon: node.lon,
stats: { totalPackets, packetsToday, avgSnr: snrCount ? snrSum / snrCount : null, lastHeard },
stats: {
totalTransmissions: packets.length,
totalObservations,
totalPackets: packets.length, // backward compat
packetsToday, avgSnr: snrCount ? snrSum / snrCount : null, lastHeard
},
observers: observerRows
});
}
@@ -1852,10 +1890,22 @@ app.get('/api/nodes/:pubkey/health', (req, res) => {
const recentPackets = packets.slice(0, 20);
// Count transmissions vs observations
const counts = pktStore.countForNode(pubkey);
const recentWithoutObs = recentPackets.map(p => {
const { observations, ...rest } = p;
return { ...rest, observation_count: p.observation_count || 1 };
});
const result = {
node: node.node || node, observers,
stats: { totalPackets: packets.length, packetsToday, avgSnr: snrN ? snrSum / snrN : null, avgHops: hopCount > 0 ? Math.round(totalHops / hopCount) : 0, lastHeard },
recentPackets
stats: {
totalTransmissions: counts.transmissions,
totalObservations: counts.observations,
totalPackets: counts.transmissions, // backward compat
packetsToday, avgSnr: snrN ? snrSum / snrN : null, avgHops: hopCount > 0 ? Math.round(totalHops / hopCount) : 0, lastHeard
},
recentPackets: recentWithoutObs
};
cache.set(_ck, result, TTL.nodeHealth);
res.json(result);