From 2c6148fd2df3dbd63971e1b10723ab8330f1589a Mon Sep 17 00:00:00 2001 From: you Date: Fri, 20 Mar 2026 20:22:30 +0000 Subject: [PATCH 1/9] Add dedup migration script (Milestone 1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Creates transmissions and observations tables from existing packets table. - Groups packets by hash → 1 transmission per unique hash - Creates 1 observation per original packet row with FK to transmission - Idempotent: drops and recreates new tables on each run - Does NOT modify the original packets table - Prints stats and verifies counts match Tested on test DB: 33813 packets → 11530 transmissions (2.93x dedup ratio) --- scripts/migrate-dedup.js | 137 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 137 insertions(+) create mode 100644 scripts/migrate-dedup.js diff --git a/scripts/migrate-dedup.js b/scripts/migrate-dedup.js new file mode 100644 index 0000000..3371888 --- /dev/null +++ b/scripts/migrate-dedup.js @@ -0,0 +1,137 @@ +#!/usr/bin/env node +/** + * Milestone 1: Packet Dedup Schema Migration + * + * Creates `transmissions` and `observations` tables from the existing `packets` table. + * Idempotent — drops and recreates new tables on each run. + * Does NOT touch the original `packets` table. + * + * Usage: node scripts/migrate-dedup.js + */ + +const Database = require('better-sqlite3'); +const path = require('path'); + +const dbPath = process.argv[2]; +if (!dbPath) { + console.error('Usage: node scripts/migrate-dedup.js '); + process.exit(1); +} + +const start = Date.now(); +const db = new Database(dbPath); +db.pragma('journal_mode = WAL'); +db.pragma('foreign_keys = ON'); + +// --- Drop existing new tables (idempotent) --- +console.log('Dropping existing transmissions/observations tables if they exist...'); +db.exec('DROP TABLE IF EXISTS observations'); +db.exec('DROP TABLE IF EXISTS transmissions'); + +// --- Create new tables --- +console.log('Creating transmissions and observations tables...'); +db.exec(` + CREATE TABLE transmissions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + raw_hex TEXT, + hash TEXT NOT NULL UNIQUE, + first_seen TEXT NOT NULL, + route_type INTEGER, + payload_type INTEGER, + payload_version INTEGER, + decoded_json TEXT, + created_at TEXT DEFAULT (datetime('now')) + ); + + CREATE TABLE observations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + transmission_id INTEGER NOT NULL REFERENCES transmissions(id), + hash TEXT NOT NULL, + observer_id TEXT, + observer_name TEXT, + direction TEXT, + snr REAL, + rssi REAL, + score INTEGER, + path_json TEXT, + timestamp TEXT NOT NULL, + created_at TEXT DEFAULT (datetime('now')) + ); + + CREATE INDEX idx_transmissions_hash ON transmissions(hash); + CREATE INDEX idx_transmissions_first_seen ON transmissions(first_seen); + CREATE INDEX idx_transmissions_payload_type ON transmissions(payload_type); + CREATE INDEX idx_observations_hash ON observations(hash); + CREATE INDEX idx_observations_transmission_id ON observations(transmission_id); + CREATE INDEX idx_observations_observer_id ON observations(observer_id); + CREATE INDEX idx_observations_timestamp ON observations(timestamp); +`); + +// --- Read all packets ordered by timestamp --- +console.log('Reading packets...'); +const packets = db.prepare('SELECT * FROM packets ORDER BY timestamp ASC').all(); +const totalPackets = packets.length; +console.log(`Total packets: ${totalPackets}`); + +// --- Group by hash and migrate --- +const insertTransmission = db.prepare(` + INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, payload_version, decoded_json) + VALUES (?, ?, ?, ?, ?, ?, ?) +`); + +const insertObservation = db.prepare(` + INSERT INTO observations (transmission_id, hash, observer_id, observer_name, direction, snr, rssi, score, path_json, timestamp) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +`); + +const hashToTransmissionId = new Map(); +let transmissionCount = 0; + +const migrate = db.transaction(() => { + for (const pkt of packets) { + let txId = hashToTransmissionId.get(pkt.hash); + if (txId === undefined) { + const result = insertTransmission.run( + pkt.raw_hex, pkt.hash, pkt.timestamp, + pkt.route_type, pkt.payload_type, pkt.payload_version, pkt.decoded_json + ); + txId = result.lastInsertRowid; + hashToTransmissionId.set(pkt.hash, txId); + transmissionCount++; + } + insertObservation.run( + txId, pkt.hash, pkt.observer_id, pkt.observer_name, pkt.direction, + pkt.snr, pkt.rssi, pkt.score, pkt.path_json, pkt.timestamp + ); + } +}); + +migrate(); + +// --- Verify --- +const obsCount = db.prepare('SELECT COUNT(*) as c FROM observations').get().c; +const txCount = db.prepare('SELECT COUNT(*) as c FROM transmissions').get().c; +const distinctHash = db.prepare('SELECT COUNT(DISTINCT hash) as c FROM packets').get().c; + +const elapsed = ((Date.now() - start) / 1000).toFixed(2); + +console.log('\n=== Migration Stats ==='); +console.log(`Total packets (source): ${totalPackets}`); +console.log(`Unique transmissions created: ${transmissionCount}`); +console.log(`Observations created: ${obsCount}`); +console.log(`Dedup ratio: ${(totalPackets / transmissionCount).toFixed(2)}x`); +console.log(`Time taken: ${elapsed}s`); + +console.log('\n=== Verification ==='); +const obsOk = obsCount === totalPackets; +const txOk = txCount === distinctHash; +console.log(`observations (${obsCount}) = packets (${totalPackets}): ${obsOk ? 'PASS ✓' : 'FAIL ✗'}`); +console.log(`transmissions (${txCount}) = distinct hashes (${distinctHash}): ${txOk ? 'PASS ✓' : 'FAIL ✗'}`); + +if (!obsOk || !txOk) { + console.error('\nVerification FAILED!'); + process.exit(1); +} + +console.log('\nMigration complete!'); +db.close(); From d7e415daa72fa4df0dc546b2d762a676a3ba82b5 Mon Sep 17 00:00:00 2001 From: you Date: Fri, 20 Mar 2026 20:24:13 +0000 Subject: [PATCH 2/9] =?UTF-8?q?fix:=20raw=5Fhex=20NOT=20NULL=20in=20transm?= =?UTF-8?q?issions=20schema=20=E2=80=94=20deleted=204=20junk=20test=20rows?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/migrate-dedup.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/migrate-dedup.js b/scripts/migrate-dedup.js index 3371888..d74ffa9 100644 --- a/scripts/migrate-dedup.js +++ b/scripts/migrate-dedup.js @@ -33,7 +33,7 @@ console.log('Creating transmissions and observations tables...'); db.exec(` CREATE TABLE transmissions ( id INTEGER PRIMARY KEY AUTOINCREMENT, - raw_hex TEXT, + raw_hex TEXT NOT NULL, hash TEXT NOT NULL UNIQUE, first_seen TEXT NOT NULL, route_type INTEGER, From baa60cac0fbf4c738e3b75c843ae0c4d264c1670 Mon Sep 17 00:00:00 2001 From: you Date: Fri, 20 Mar 2026 20:29:03 +0000 Subject: [PATCH 3/9] M2: Dual-write ingest to transmissions/observations tables MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add transmissions and observations schema to db.js init - Add insertTransmission() function: upsert transmission by hash, always insert observation row - All 6 pktStore.insert() call sites in server.js now also call db.insertTransmission() with try/catch (non-fatal on error) - packet-store.js: add byTransmission Map index (hash → transmission with observations array) for future M3 query migration - Existing insertPacket() and all read paths unchanged --- db.js | 90 ++++++++++++++++++++++++++++++++++++++++++++++++- packet-store.js | 29 ++++++++++++++++ server.js | 36 +++++++++++++------- 3 files changed, 142 insertions(+), 13 deletions(-) diff --git a/db.js b/db.js index 9083b2f..fb36d56 100644 --- a/db.js +++ b/db.js @@ -71,6 +71,41 @@ db.exec(` CREATE INDEX IF NOT EXISTS idx_packets_payload_type ON packets(payload_type); CREATE INDEX IF NOT EXISTS idx_nodes_last_seen ON nodes(last_seen); CREATE INDEX IF NOT EXISTS idx_observers_last_seen ON observers(last_seen); + + CREATE TABLE IF NOT EXISTS transmissions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + raw_hex TEXT NOT NULL, + hash TEXT NOT NULL UNIQUE, + first_seen TEXT NOT NULL, + route_type INTEGER, + payload_type INTEGER, + payload_version INTEGER, + decoded_json TEXT, + created_at TEXT DEFAULT (datetime('now')) + ); + + CREATE TABLE IF NOT EXISTS observations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + transmission_id INTEGER NOT NULL REFERENCES transmissions(id), + hash TEXT NOT NULL, + observer_id TEXT, + observer_name TEXT, + direction TEXT, + snr REAL, + rssi REAL, + score INTEGER, + path_json TEXT, + timestamp TEXT NOT NULL, + created_at TEXT DEFAULT (datetime('now')) + ); + + CREATE INDEX IF NOT EXISTS idx_transmissions_hash ON transmissions(hash); + CREATE INDEX IF NOT EXISTS idx_transmissions_first_seen ON transmissions(first_seen); + CREATE INDEX IF NOT EXISTS idx_transmissions_payload_type ON transmissions(payload_type); + CREATE INDEX IF NOT EXISTS idx_observations_hash ON observations(hash); + CREATE INDEX IF NOT EXISTS idx_observations_transmission_id ON observations(transmission_id); + CREATE INDEX IF NOT EXISTS idx_observations_observer_id ON observations(observer_id); + CREATE INDEX IF NOT EXISTS idx_observations_timestamp ON observations(timestamp); `); // --- Migrations for existing DBs --- @@ -144,6 +179,16 @@ const stmts = { countNodes: db.prepare(`SELECT COUNT(*) as count FROM nodes`), countObservers: db.prepare(`SELECT COUNT(*) as count FROM observers`), countRecentPackets: db.prepare(`SELECT COUNT(*) as count FROM packets WHERE timestamp > ?`), + getTransmissionByHash: db.prepare(`SELECT id, first_seen FROM transmissions WHERE hash = ?`), + insertTransmission: db.prepare(` + INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, payload_version, decoded_json) + VALUES (@raw_hex, @hash, @first_seen, @route_type, @payload_type, @payload_version, @decoded_json) + `), + updateTransmissionFirstSeen: db.prepare(`UPDATE transmissions SET first_seen = @first_seen WHERE id = @id`), + insertObservation: db.prepare(` + INSERT INTO observations (transmission_id, hash, observer_id, observer_name, direction, snr, rssi, score, path_json, timestamp) + VALUES (@transmission_id, @hash, @observer_id, @observer_name, @direction, @snr, @rssi, @score, @path_json, @timestamp) + `), }; // --- Helper functions --- @@ -168,6 +213,49 @@ function insertPacket(data) { return stmts.insertPacket.run(d).lastInsertRowid; } +function insertTransmission(data) { + const hash = data.hash; + if (!hash) return null; // Can't deduplicate without a hash + + const timestamp = data.timestamp || new Date().toISOString(); + let transmissionId; + + const existing = stmts.getTransmissionByHash.get(hash); + if (existing) { + transmissionId = existing.id; + // Update first_seen if this observation is earlier + if (timestamp < existing.first_seen) { + stmts.updateTransmissionFirstSeen.run({ id: transmissionId, first_seen: timestamp }); + } + } else { + const result = stmts.insertTransmission.run({ + raw_hex: data.raw_hex || '', + hash, + first_seen: timestamp, + route_type: data.route_type ?? null, + payload_type: data.payload_type ?? null, + payload_version: data.payload_version ?? null, + decoded_json: data.decoded_json || null, + }); + transmissionId = result.lastInsertRowid; + } + + const obsResult = stmts.insertObservation.run({ + transmission_id: transmissionId, + hash, + observer_id: data.observer_id || null, + observer_name: data.observer_name || null, + direction: data.direction || null, + snr: data.snr ?? null, + rssi: data.rssi ?? null, + score: data.score ?? null, + path_json: data.path_json || null, + timestamp, + }); + + return { transmissionId, observationId: obsResult.lastInsertRowid }; +} + function insertPath(packetId, hops) { const tx = db.transaction((hops) => { for (let i = 0; i < hops.length; i++) { @@ -557,4 +645,4 @@ function getNodeAnalytics(pubkey, days) { }; } -module.exports = { db, insertPacket, insertPath, upsertNode, upsertObserver, updateObserverStatus, getPackets, getPacket, getNodes, getNode, getObservers, getStats, seed, searchNodes, getNodeHealth, getNodeAnalytics }; +module.exports = { db, insertPacket, insertTransmission, insertPath, upsertNode, upsertObserver, updateObserverStatus, getPackets, getPacket, getNodes, getNode, getObservers, getStats, seed, searchNodes, getNodeHealth, getNodeAnalytics }; diff --git a/packet-store.js b/packet-store.js index 4757f8f..82ba002 100644 --- a/packet-store.js +++ b/packet-store.js @@ -23,6 +23,7 @@ class PacketStore { this.byHash = new Map(); // hash → [packet, ...] this.byObserver = new Map(); // observer_id → [packet, ...] this.byNode = new Map(); // pubkey → [packet, ...] + this.byTransmission = new Map(); // hash → {id, hash, first_seen, payload_type, decoded_json, observations: []} this.loaded = false; this.stats = { totalLoaded: 0, evicted: 0, inserts: 0, queries: 0 }; @@ -69,6 +70,33 @@ class PacketStore { // Index by node pubkeys mentioned in decoded_json this._indexByNode(pkt); + + // Index by transmission (dedup view) + if (pkt.hash) { + if (!this.byTransmission.has(pkt.hash)) { + this.byTransmission.set(pkt.hash, { + id: pkt.id, + hash: pkt.hash, + first_seen: pkt.timestamp, + payload_type: pkt.payload_type, + decoded_json: pkt.decoded_json, + observations: [], + }); + } + const tx = this.byTransmission.get(pkt.hash); + if (pkt.timestamp < tx.first_seen) tx.first_seen = pkt.timestamp; + tx.observations.push({ + id: pkt.id, + observer_id: pkt.observer_id, + observer_name: pkt.observer_name, + direction: pkt.direction, + snr: pkt.snr, + rssi: pkt.rssi, + score: pkt.score, + path_json: pkt.path_json, + timestamp: pkt.timestamp, + }); + } } /** Extract node pubkeys/names from decoded_json and index */ @@ -311,6 +339,7 @@ class PacketStore { byHash: this.byHash.size, byObserver: this.byObserver.size, byNode: this.byNode.size, + byTransmission: this.byTransmission.size, } }; } diff --git a/server.js b/server.js index b8a982e..5d8f3bc 100644 --- a/server.js +++ b/server.js @@ -514,7 +514,7 @@ for (const source of mqttSources) { const observerId = parts[2] || null; const region = parts[1] || null; - const packetId = pktStore.insert({ + const pktData = { raw_hex: msg.raw, timestamp: now, observer_id: observerId, @@ -527,7 +527,9 @@ for (const source of mqttSources) { payload_version: decoded.header.payloadVersion, path_json: JSON.stringify(decoded.path.hops), decoded_json: JSON.stringify(decoded.payload), - }); + }; + const packetId = pktStore.insert(pktData); + try { db.insertTransmission(pktData); } catch (e) { console.error('[dual-write] transmission insert error:', e.message); } if (decoded.path.hops.length > 0) { db.insertPath(packetId, decoded.path.hops); @@ -598,7 +600,7 @@ for (const source of mqttSources) { const role = advert.role || (advert.flags?.repeater ? 'repeater' : advert.flags?.room ? 'room' : 'companion'); db.upsertNode({ public_key: pubKey, name, role, lat, lon, last_seen: now }); - const packetId = pktStore.insert({ + const advertPktData = { raw_hex: null, timestamp: now, observer_id: 'companion', @@ -611,7 +613,9 @@ for (const source of mqttSources) { payload_version: 0, path_json: JSON.stringify([]), decoded_json: JSON.stringify(advert), - }); + }; + const packetId = pktStore.insert(advertPktData); + try { db.insertTransmission(advertPktData); } catch (e) { console.error('[dual-write] transmission insert error:', e.message); } broadcast({ type: 'packet', data: { id: packetId, decoded: { header: { payloadTypeName: 'ADVERT' }, payload: advert } } }); } return; @@ -629,7 +633,7 @@ for (const source of mqttSources) { const senderKey = `sender-${senderName.toLowerCase().replace(/[^a-z0-9]/g, '')}`; db.upsertNode({ public_key: senderKey, name: senderName, role: 'companion', lat: null, lon: null, last_seen: now }); } - const packetId = pktStore.insert({ + const chPktData = { raw_hex: null, timestamp: now, observer_id: 'companion', @@ -642,7 +646,9 @@ for (const source of mqttSources) { payload_version: 0, path_json: JSON.stringify([]), decoded_json: JSON.stringify(channelMsg), - }); + }; + const packetId = pktStore.insert(chPktData); + try { db.insertTransmission(chPktData); } catch (e) { console.error('[dual-write] transmission insert error:', e.message); } broadcast({ type: 'packet', data: { id: packetId, decoded: { header: { payloadTypeName: 'GRP_TXT' }, payload: channelMsg } } }); broadcast({ type: 'message', data: { id: packetId, decoded: { header: { payloadTypeName: 'GRP_TXT' }, payload: channelMsg } } }); return; @@ -651,7 +657,7 @@ for (const source of mqttSources) { // Handle direct messages if (topic.startsWith('meshcore/message/direct/')) { const dm = msg.payload || msg; - const packetId = pktStore.insert({ + const dmPktData = { raw_hex: null, timestamp: dm.timestamp || now, observer_id: 'companion', @@ -663,7 +669,9 @@ for (const source of mqttSources) { payload_version: 0, path_json: JSON.stringify(dm.hops || []), decoded_json: JSON.stringify(dm), - }); + }; + const packetId = pktStore.insert(dmPktData); + try { db.insertTransmission(dmPktData); } catch (e) { console.error('[dual-write] transmission insert error:', e.message); } broadcast({ type: 'packet', data: { id: packetId, decoded: { header: { payloadTypeName: 'TXT_MSG' }, payload: dm } } }); return; } @@ -671,7 +679,7 @@ for (const source of mqttSources) { // Handle traceroute if (topic.startsWith('meshcore/traceroute/')) { const trace = msg.payload || msg; - const packetId = pktStore.insert({ + const tracePktData = { raw_hex: null, timestamp: now, observer_id: 'companion', @@ -683,7 +691,9 @@ for (const source of mqttSources) { payload_version: 0, path_json: JSON.stringify(trace.hops || trace.path || []), decoded_json: JSON.stringify(trace), - }); + }; + const packetId = pktStore.insert(tracePktData); + try { db.insertTransmission(tracePktData); } catch (e) { console.error('[dual-write] transmission insert error:', e.message); } broadcast({ type: 'packet', data: { id: packetId, decoded: { header: { payloadTypeName: 'TRACE' }, payload: trace } } }); return; } @@ -860,7 +870,7 @@ app.post('/api/packets', (req, res) => { const decoded = decoder.decodePacket(hex, channelKeys); const now = new Date().toISOString(); - const packetId = pktStore.insert({ + const apiPktData = { raw_hex: hex.toUpperCase(), timestamp: now, observer_id: observer || null, @@ -872,7 +882,9 @@ app.post('/api/packets', (req, res) => { payload_version: decoded.header.payloadVersion, path_json: JSON.stringify(decoded.path.hops), decoded_json: JSON.stringify(decoded.payload), - }); + }; + const packetId = pktStore.insert(apiPktData); + try { db.insertTransmission(apiPktData); } catch (e) { console.error('[dual-write] transmission insert error:', e.message); } if (decoded.path.hops.length > 0) { db.insertPath(packetId, decoded.path.hops); From 84f33aef7b1fc015e24439b9cff6992ba889089f Mon Sep 17 00:00:00 2001 From: you Date: Fri, 20 Mar 2026 20:44:32 +0000 Subject: [PATCH 4/9] M3: Restructure in-memory store around transmissions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - load() reads from transmissions JOIN observations (with legacy fallback) - byHash now maps hash → single transmission object (1:1) - byNode maps pubkey → [transmissions] (deduped, no inflated observations) - byTransmission is the primary data structure - byId maps observation IDs for backward-compat packet detail links - byObserver still maps observer_id → [observations] - getSiblings() returns observations from transmission - findPacketsForNode() returns unique transmissions - query()/queryGrouped() work with transmission-centric model - All returned objects maintain backward-compatible fields - SQLite-only fallback path (NO_MEMORY_STORE=1) unchanged - Tested: 11.6K transmissions from 37.5K observations (3.2x dedup) --- packet-store.js | 496 ++++++++++++++++++++++++++++++++++-------------- 1 file changed, 353 insertions(+), 143 deletions(-) diff --git a/packet-store.js b/packet-store.js index 82ba002..f32b231 100644 --- a/packet-store.js +++ b/packet-store.js @@ -1,8 +1,9 @@ 'use strict'; /** - * In-memory packet store — loads all packets from SQLite on startup, + * In-memory packet store — loads transmissions + observations from SQLite on startup, * serves reads from RAM, writes to both RAM + SQLite. + * M3: Restructured around transmissions (deduped by hash) with observations. * Caps memory at configurable limit (default 1GB). */ class PacketStore { @@ -16,17 +17,22 @@ class PacketStore { // SQLite-only mode: skip RAM loading, all reads go to DB this.sqliteOnly = process.env.NO_MEMORY_STORE === '1'; - // Core storage: array sorted by timestamp DESC (newest first) + // Primary storage: transmissions sorted by first_seen DESC (newest first) + // Each transmission looks like a packet for backward compat this.packets = []; + // Indexes - this.byId = new Map(); - this.byHash = new Map(); // hash → [packet, ...] - this.byObserver = new Map(); // observer_id → [packet, ...] - this.byNode = new Map(); // pubkey → [packet, ...] - this.byTransmission = new Map(); // hash → {id, hash, first_seen, payload_type, decoded_json, observations: []} + this.byId = new Map(); // observation_id → observation object (backward compat for packet detail links) + this.byHash = new Map(); // hash → transmission object (1:1) + this.byObserver = new Map(); // observer_id → [observation objects] + this.byNode = new Map(); // pubkey → [transmission objects] (deduped) + this.byTransmission = new Map(); // hash → transmission object (same refs as byHash) + + // Track which hashes are indexed per node pubkey (avoid dupes in byNode) + this._nodeHashIndex = new Map(); // pubkey → Set this.loaded = false; - this.stats = { totalLoaded: 0, evicted: 0, inserts: 0, queries: 0 }; + this.stats = { totalLoaded: 0, totalObservations: 0, evicted: 0, inserts: 0, queries: 0 }; } /** Load all packets from SQLite into memory */ @@ -36,88 +42,310 @@ class PacketStore { this.loaded = true; return this; } + const t0 = Date.now(); + + // Check if normalized schema exists + const hasTransmissions = this.db.prepare( + "SELECT name FROM sqlite_master WHERE type='table' AND name='transmissions'" + ).get(); + + if (hasTransmissions) { + this._loadNormalized(); + } else { + this._loadLegacy(); + } + + this.stats.totalLoaded = this.packets.length; + this.loaded = true; + const elapsed = Date.now() - t0; + console.log(`[PacketStore] Loaded ${this.packets.length} transmissions (${this.stats.totalObservations} observations) in ${elapsed}ms (${Math.round(this.packets.length * this.estPacketBytes / 1024 / 1024)}MB est)`); + return this; + } + + /** Load from normalized transmissions + observations tables */ + _loadNormalized() { + const rows = this.db.prepare(` + SELECT t.id AS transmission_id, t.raw_hex, t.hash, t.first_seen, t.route_type, + t.payload_type, t.payload_version, t.decoded_json, + o.id AS observation_id, o.observer_id, o.observer_name, o.direction, + o.snr, o.rssi, o.score, o.path_json, o.timestamp AS obs_timestamp + FROM transmissions t + LEFT JOIN observations o ON o.transmission_id = t.id + ORDER BY t.first_seen DESC, o.timestamp DESC + `).all(); + + for (const row of rows) { + if (this.packets.length >= this.maxPackets && !this.byTransmission.has(row.hash)) break; + + let tx = this.byTransmission.get(row.hash); + if (!tx) { + tx = { + id: row.transmission_id, + raw_hex: row.raw_hex, + hash: row.hash, + first_seen: row.first_seen, + timestamp: row.first_seen, + route_type: row.route_type, + payload_type: row.payload_type, + decoded_json: row.decoded_json, + observations: [], + observation_count: 0, + // Filled from first observation for backward compat + observer_id: null, + observer_name: null, + snr: null, + rssi: null, + path_json: null, + direction: null, + }; + this.byTransmission.set(row.hash, tx); + this.byHash.set(row.hash, tx); + this.packets.push(tx); + this._indexByNode(tx); + } + + if (row.observation_id != null) { + const obs = { + id: row.observation_id, + observer_id: row.observer_id, + observer_name: row.observer_name, + direction: row.direction, + snr: row.snr, + rssi: row.rssi, + score: row.score, + path_json: row.path_json, + timestamp: row.obs_timestamp, + // Carry transmission fields for backward compat + hash: row.hash, + raw_hex: row.raw_hex, + payload_type: row.payload_type, + decoded_json: row.decoded_json, + route_type: row.route_type, + }; + + tx.observations.push(obs); + tx.observation_count++; + + // Fill first observation data into transmission for backward compat + if (tx.observer_id == null && obs.observer_id) { + tx.observer_id = obs.observer_id; + tx.observer_name = obs.observer_name; + tx.snr = obs.snr; + tx.rssi = obs.rssi; + tx.path_json = obs.path_json; + tx.direction = obs.direction; + } + + // byId maps observation IDs for packet detail links + this.byId.set(obs.id, obs); + + // byObserver + if (obs.observer_id) { + if (!this.byObserver.has(obs.observer_id)) this.byObserver.set(obs.observer_id, []); + this.byObserver.get(obs.observer_id).push(obs); + } + + this.stats.totalObservations++; + } + } + } + + /** Fallback: load from legacy packets table */ + _loadLegacy() { const rows = this.db.prepare( 'SELECT * FROM packets ORDER BY timestamp DESC' ).all(); for (const row of rows) { if (this.packets.length >= this.maxPackets) break; - this._index(row); - this.packets.push(row); + this._indexLegacy(row); } - - this.stats.totalLoaded = this.packets.length; - this.loaded = true; - const elapsed = Date.now() - t0; - console.log(`[PacketStore] Loaded ${this.packets.length} packets in ${elapsed}ms (${Math.round(this.packets.length * this.estPacketBytes / 1024 / 1024)}MB est)`); - return this; } - /** Index a packet into all lookup maps */ - _index(pkt) { - this.byId.set(pkt.id, pkt); - - if (pkt.hash) { - if (!this.byHash.has(pkt.hash)) this.byHash.set(pkt.hash, []); - this.byHash.get(pkt.hash).push(pkt); + /** Index a legacy packet row (old flat structure) — builds transmission + observation */ + _indexLegacy(pkt) { + let tx = this.byTransmission.get(pkt.hash); + if (!tx) { + tx = { + id: pkt.id, + raw_hex: pkt.raw_hex, + hash: pkt.hash, + first_seen: pkt.timestamp, + timestamp: pkt.timestamp, + route_type: pkt.route_type, + payload_type: pkt.payload_type, + decoded_json: pkt.decoded_json, + observations: [], + observation_count: 0, + observer_id: pkt.observer_id, + observer_name: pkt.observer_name, + snr: pkt.snr, + rssi: pkt.rssi, + path_json: pkt.path_json, + direction: pkt.direction, + }; + this.byTransmission.set(pkt.hash, tx); + this.byHash.set(pkt.hash, tx); + this.packets.push(tx); + this._indexByNode(tx); } + if (pkt.timestamp < tx.first_seen) { + tx.first_seen = pkt.timestamp; + tx.timestamp = pkt.timestamp; + } + + const obs = { + id: pkt.id, + observer_id: pkt.observer_id, + observer_name: pkt.observer_name, + direction: pkt.direction, + snr: pkt.snr, + rssi: pkt.rssi, + score: pkt.score, + path_json: pkt.path_json, + timestamp: pkt.timestamp, + hash: pkt.hash, + raw_hex: pkt.raw_hex, + payload_type: pkt.payload_type, + decoded_json: pkt.decoded_json, + route_type: pkt.route_type, + }; + tx.observations.push(obs); + tx.observation_count++; + + this.byId.set(pkt.id, obs); + if (pkt.observer_id) { if (!this.byObserver.has(pkt.observer_id)) this.byObserver.set(pkt.observer_id, []); - this.byObserver.get(pkt.observer_id).push(pkt); + this.byObserver.get(pkt.observer_id).push(obs); } - // Index by node pubkeys mentioned in decoded_json - this._indexByNode(pkt); - - // Index by transmission (dedup view) - if (pkt.hash) { - if (!this.byTransmission.has(pkt.hash)) { - this.byTransmission.set(pkt.hash, { - id: pkt.id, - hash: pkt.hash, - first_seen: pkt.timestamp, - payload_type: pkt.payload_type, - decoded_json: pkt.decoded_json, - observations: [], - }); - } - const tx = this.byTransmission.get(pkt.hash); - if (pkt.timestamp < tx.first_seen) tx.first_seen = pkt.timestamp; - tx.observations.push({ - id: pkt.id, - observer_id: pkt.observer_id, - observer_name: pkt.observer_name, - direction: pkt.direction, - snr: pkt.snr, - rssi: pkt.rssi, - score: pkt.score, - path_json: pkt.path_json, - timestamp: pkt.timestamp, - }); - } + this.stats.totalObservations++; } - /** Extract node pubkeys/names from decoded_json and index */ - _indexByNode(pkt) { - if (!pkt.decoded_json) return; + /** Extract node pubkeys from decoded_json and index transmission in byNode */ + _indexByNode(tx) { + if (!tx.decoded_json) return; try { - const decoded = JSON.parse(pkt.decoded_json); + const decoded = JSON.parse(tx.decoded_json); const keys = new Set(); if (decoded.pubKey) keys.add(decoded.pubKey); if (decoded.destPubKey) keys.add(decoded.destPubKey); if (decoded.srcPubKey) keys.add(decoded.srcPubKey); for (const k of keys) { + if (!this._nodeHashIndex.has(k)) this._nodeHashIndex.set(k, new Set()); + if (this._nodeHashIndex.get(k).has(tx.hash)) continue; // already indexed + this._nodeHashIndex.get(k).add(tx.hash); if (!this.byNode.has(k)) this.byNode.set(k, []); - this.byNode.get(k).push(pkt); + this.byNode.get(k).push(tx); } } catch {} } + /** Remove oldest transmissions when over memory limit */ + _evict() { + while (this.packets.length > this.maxPackets) { + const old = this.packets.pop(); + this.byHash.delete(old.hash); + this.byTransmission.delete(old.hash); + // Remove observations from byId and byObserver + for (const obs of old.observations) { + this.byId.delete(obs.id); + if (obs.observer_id && this.byObserver.has(obs.observer_id)) { + const arr = this.byObserver.get(obs.observer_id).filter(o => o.id !== obs.id); + if (arr.length) this.byObserver.set(obs.observer_id, arr); else this.byObserver.delete(obs.observer_id); + } + } + // Skip node index cleanup (expensive, low value) + this.stats.evicted++; + } + } + + /** Insert a new packet (to both memory and SQLite) */ + insert(packetData) { + const id = this.dbModule.insertPacket(packetData); + const row = this.dbModule.getPacket(id); + if (row && !this.sqliteOnly) { + // Update or create transmission in memory + let tx = this.byTransmission.get(row.hash); + if (!tx) { + tx = { + id: row.id, + raw_hex: row.raw_hex, + hash: row.hash, + first_seen: row.timestamp, + timestamp: row.timestamp, + route_type: row.route_type, + payload_type: row.payload_type, + decoded_json: row.decoded_json, + observations: [], + observation_count: 0, + observer_id: row.observer_id, + observer_name: row.observer_name, + snr: row.snr, + rssi: row.rssi, + path_json: row.path_json, + direction: row.direction, + }; + this.byTransmission.set(row.hash, tx); + this.byHash.set(row.hash, tx); + this.packets.unshift(tx); // newest first + this._indexByNode(tx); + } else { + // Update first_seen if earlier + if (row.timestamp < tx.first_seen) { + tx.first_seen = row.timestamp; + tx.timestamp = row.timestamp; + } + } + + // Add observation + const obs = { + id: row.id, + observer_id: row.observer_id, + observer_name: row.observer_name, + direction: row.direction, + snr: row.snr, + rssi: row.rssi, + score: row.score, + path_json: row.path_json, + timestamp: row.timestamp, + hash: row.hash, + raw_hex: row.raw_hex, + payload_type: row.payload_type, + decoded_json: row.decoded_json, + route_type: row.route_type, + }; + tx.observations.push(obs); + tx.observation_count++; + + // Update transmission's display fields if this is first observation + if (tx.observations.length === 1) { + tx.observer_id = obs.observer_id; + tx.observer_name = obs.observer_name; + tx.snr = obs.snr; + tx.rssi = obs.rssi; + tx.path_json = obs.path_json; + } + + this.byId.set(obs.id, obs); + if (obs.observer_id) { + if (!this.byObserver.has(obs.observer_id)) this.byObserver.set(obs.observer_id, []); + this.byObserver.get(obs.observer_id).push(obs); + } + + this.stats.totalObservations++; + this._evict(); + this.stats.inserts++; + } + return id; + } + /** * Find ALL packets referencing a node — by pubkey index + name + pubkey text search. - * Single source of truth for "get packets for node X". + * Returns unique transmissions (deduped). * @param {string} nodeIdOrName - pubkey or friendly name * @param {Array} [fromPackets] - packet array to filter (defaults to this.packets) * @returns {{ packets: Array, pubkey: string, nodeName: string }} @@ -132,49 +360,24 @@ class PacketStore { if (row) { pubkey = row.public_key; nodeName = row.name || nodeIdOrName; } } catch {} - // Combine: index hits + text search by both name and pubkey + // Combine: index hits + text search const indexed = this.byNode.get(pubkey); - const idSet = indexed ? new Set(indexed.map(p => p.id)) : new Set(); + const hashSet = indexed ? new Set(indexed.map(t => t.hash)) : new Set(); const source = fromPackets || this.packets; - const packets = source.filter(p => - idSet.has(p.id) || - (p.decoded_json && (p.decoded_json.includes(nodeName) || p.decoded_json.includes(pubkey))) + const packets = source.filter(t => + hashSet.has(t.hash) || + (t.decoded_json && (t.decoded_json.includes(nodeName) || t.decoded_json.includes(pubkey))) ); return { packets, pubkey, nodeName }; } - /** Remove oldest packets when over memory limit */ - _evict() { - while (this.packets.length > this.maxPackets) { - const old = this.packets.pop(); - this.byId.delete(old.id); - // Remove from hash index - if (old.hash && this.byHash.has(old.hash)) { - const arr = this.byHash.get(old.hash).filter(p => p.id !== old.id); - if (arr.length) this.byHash.set(old.hash, arr); else this.byHash.delete(old.hash); - } - // Remove from observer index - if (old.observer_id && this.byObserver.has(old.observer_id)) { - const arr = this.byObserver.get(old.observer_id).filter(p => p.id !== old.id); - if (arr.length) this.byObserver.set(old.observer_id, arr); else this.byObserver.delete(old.observer_id); - } - // Skip node index cleanup for eviction (expensive, low value) - this.stats.evicted++; - } - } - - /** Insert a new packet (to both memory and SQLite) */ - insert(packetData) { - const id = this.dbModule.insertPacket(packetData); - const row = this.dbModule.getPacket(id); - if (row) { - this.packets.unshift(row); // newest first - this._index(row); - this._evict(); - this.stats.inserts++; - } - return id; + /** Count transmissions and observations for a node */ + countForNode(pubkey) { + const txs = this.byNode.get(pubkey) || []; + let observations = 0; + for (const tx of txs) observations += tx.observation_count; + return { transmissions: txs.length, observations }; } /** Query packets with filters — all from memory (or SQLite in fallback mode) */ @@ -187,9 +390,11 @@ class PacketStore { // Use indexes for single-key filters when possible if (hash && !type && !route && !region && !observer && !since && !until && !node) { - results = this.byHash.get(hash) || []; + const tx = this.byHash.get(hash); + results = tx ? [tx] : []; } else if (observer && !type && !route && !region && !hash && !since && !until && !node) { - results = this.byObserver.get(observer) || []; + // For observer filter, find unique transmissions where any observation matches + results = this._transmissionsForObserver(observer); } else if (node && !type && !route && !region && !observer && !hash && !since && !until) { results = this.findPacketsForNode(node).packets; } else { @@ -202,18 +407,22 @@ class PacketStore { const r = Number(route); results = results.filter(p => p.route_type === r); } - if (observer) results = results.filter(p => p.observer_id === observer); - if (hash) results = results.filter(p => p.hash === hash); + if (observer) results = this._transmissionsForObserver(observer, results); + if (hash) { + const tx = this.byHash.get(hash); + results = tx ? results.filter(p => p.hash === hash) : []; + } if (since) results = results.filter(p => p.timestamp > since); if (until) results = results.filter(p => p.timestamp < until); if (region) { - // Need to look up observers for this region const regionObservers = new Set(); try { const obs = this.db.prepare('SELECT id FROM observers WHERE iata = ?').all(region); obs.forEach(o => regionObservers.add(o.id)); } catch {} - results = results.filter(p => regionObservers.has(p.observer_id)); + results = results.filter(p => + p.observations.some(o => regionObservers.has(o.observer_id)) + ); } if (node) { results = this.findPacketsForNode(node, results).packets; @@ -237,52 +446,52 @@ class PacketStore { return { packets: paginated, total }; } - /** Query with groupByHash — aggregate packets by content hash */ + /** Find unique transmissions that have at least one observation from given observer */ + _transmissionsForObserver(observerId, fromTransmissions) { + if (fromTransmissions) { + return fromTransmissions.filter(tx => + tx.observations.some(o => o.observer_id === observerId) + ); + } + // Use byObserver index: get observations, then unique transmissions + const obs = this.byObserver.get(observerId) || []; + const seen = new Set(); + const result = []; + for (const o of obs) { + if (!seen.has(o.hash)) { + seen.add(o.hash); + const tx = this.byTransmission.get(o.hash); + if (tx) result.push(tx); + } + } + return result; + } + + /** Query with groupByHash — now trivial since packets ARE transmissions */ queryGrouped({ limit = 50, offset = 0, type, route, region, observer, hash, since, until, node } = {}) { this.stats.queries++; if (this.sqliteOnly) return this._queryGroupedSQLite({ limit, offset, type, route, region, observer, hash, since, until, node }); - // Get filtered results first + // Get filtered transmissions const { packets: filtered, total: filteredTotal } = this.query({ limit: 999999, offset: 0, type, route, region, observer, hash, since, until, node }); - // Group by hash - const groups = new Map(); - for (const p of filtered) { - const h = p.hash || p.id.toString(); - if (!groups.has(h)) { - groups.set(h, { - hash: p.hash, - observer_count: new Set(), - count: 0, - latest: p.timestamp, - observer_id: p.observer_id, - observer_name: p.observer_name, - path_json: p.path_json, - payload_type: p.payload_type, - raw_hex: p.raw_hex, - decoded_json: p.decoded_json, - }); - } - const g = groups.get(h); - g.count++; - if (p.observer_id) g.observer_count.add(p.observer_id); - if (p.timestamp > g.latest) { - g.latest = p.timestamp; - } - // Keep longest path - if (p.path_json && (!g.path_json || p.path_json.length > g.path_json.length)) { - g.path_json = p.path_json; - g.raw_hex = p.raw_hex; - } - } - - // Sort by latest DESC, paginate - const sorted = [...groups.values()] - .map(g => ({ ...g, observer_count: g.observer_count.size })) - .sort((a, b) => b.latest.localeCompare(a.latest)); + // Already grouped by hash — just format for backward compat + const sorted = filtered.map(tx => ({ + hash: tx.hash, + count: tx.observation_count, + observer_count: new Set(tx.observations.map(o => o.observer_id).filter(Boolean)).size, + latest: tx.observations.length ? tx.observations.reduce((max, o) => o.timestamp > max ? o.timestamp : max, tx.observations[0].timestamp) : tx.timestamp, + observer_id: tx.observer_id, + observer_name: tx.observer_name, + path_json: tx.path_json, + payload_type: tx.payload_type, + raw_hex: tx.raw_hex, + decoded_json: tx.decoded_json, + observation_count: tx.observation_count, + })).sort((a, b) => b.latest.localeCompare(a.latest)); const total = sorted.length; const paginated = sorted.slice(Number(offset), Number(offset) + Number(limit)); @@ -302,25 +511,26 @@ class PacketStore { return results.reverse(); } - /** Get a single packet by ID */ + /** Get a single packet by ID — checks observation IDs first (backward compat) */ getById(id) { if (this.sqliteOnly) return this.db.prepare('SELECT * FROM packets WHERE id = ?').get(id) || null; return this.byId.get(id) || null; } - /** Get all siblings of a packet (same hash) */ + /** Get all siblings of a packet (same hash) — returns observations array */ getSiblings(hash) { if (this.sqliteOnly) return this.db.prepare('SELECT * FROM packets WHERE hash = ? ORDER BY timestamp DESC').all(hash); - return this.byHash.get(hash) || []; + const tx = this.byTransmission.get(hash); + return tx ? tx.observations : []; } - /** Get all packets (raw array reference — do not mutate) */ + /** Get all transmissions (backward compat — returns packets array) */ all() { if (this.sqliteOnly) return this.db.prepare('SELECT * FROM packets ORDER BY timestamp DESC').all(); return this.packets; } - /** Get all packets matching a filter function */ + /** Get all transmissions matching a filter function */ filter(fn) { if (this.sqliteOnly) return this.db.prepare('SELECT * FROM packets ORDER BY timestamp DESC').all().filter(fn); return this.packets.filter(fn); From aa35164252e5cd08df8dcd5efff2ed61a39a7cf7 Mon Sep 17 00:00:00 2001 From: you Date: Fri, 20 Mar 2026 20:49:34 +0000 Subject: [PATCH 5/9] M4: API response changes for dedup-normalize - GET /api/packets: returns transmissions with observation_count, strip observations[] by default (use ?expand=observations to include) - GET /api/packets/:id: includes observation_count and observations[] - GET /api/nodes/:pubkey/health: stats.totalTransmissions + totalObservations (totalPackets kept for backward compat) - GET /api/nodes/bulk-health: same transmission/observation split - WebSocket broadcast: includes observation_count - db.js getStats(): adds totalTransmissions count - All backward-compatible: old field names preserved alongside new ones --- db.js | 7 +++++++ server.js | 52 +++++++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 52 insertions(+), 7 deletions(-) diff --git a/db.js b/db.js index fb36d56..3691a68 100644 --- a/db.js +++ b/db.js @@ -364,8 +364,15 @@ function getObservers() { function getStats() { const oneHourAgo = new Date(Date.now() - 3600000).toISOString(); + // Try to get transmission count from normalized schema + let totalTransmissions = null; + try { + totalTransmissions = db.prepare('SELECT COUNT(*) as count FROM transmissions').get().count; + } catch {} return { totalPackets: stmts.countPackets.get().count, + totalTransmissions, + totalObservations: stmts.countPackets.get().count, // legacy packets = observations totalNodes: stmts.countNodes.get().count, totalObservers: stmts.countObservers.get().count, packetsLastHour: stmts.countRecentPackets.get(oneHourAgo).count, diff --git a/server.js b/server.js index 5d8f3bc..bc3b355 100644 --- a/server.js +++ b/server.js @@ -562,7 +562,9 @@ for (const source of mqttSources) { cache.debouncedInvalidateAll(); const fullPacket = pktStore.getById(packetId); - const broadcastData = { id: packetId, raw: msg.raw, decoded, snr: msg.SNR, rssi: msg.RSSI, hash: msg.hash, observer: observerId, packet: fullPacket }; + const tx = pktStore.byTransmission.get(pktData.hash); + const observation_count = tx ? tx.observation_count : 1; + const broadcastData = { id: packetId, raw: msg.raw, decoded, snr: msg.SNR, rssi: msg.RSSI, hash: msg.hash, observer: observerId, packet: fullPacket, observation_count }; broadcast({ type: 'packet', data: broadcastData }); if (decoded.header.payloadTypeName === 'GRP_TXT') { @@ -749,11 +751,23 @@ app.get('/api/packets', (req, res) => { return res.json({ packets: paged, total, limit: Number(limit), offset: Number(offset) }); } + // groupByHash is now the default behavior (transmissions ARE grouped) — keep param for compat if (groupByHash === 'true') { return res.json(pktStore.queryGrouped({ limit, offset, type, route, region, observer, hash, since, until, node })); } - res.json(pktStore.query({ limit, offset, type, route, region, observer, hash, since, until, node, order })); + const expand = req.query.expand; + const result = pktStore.query({ limit, offset, type, route, region, observer, hash, since, until, node, order }); + + // Strip observations[] from default response for bandwidth; include with ?expand=observations + if (expand !== 'observations') { + result.packets = result.packets.map(p => { + const { observations, ...rest } = p; + return rest; + }); + } + + res.json(result); }); // Lightweight endpoint: just timestamps for timeline sparkline @@ -784,7 +798,12 @@ app.get('/api/packets/:id', (req, res) => { // Build byte breakdown const breakdown = buildBreakdown(packet.raw_hex, decoded); - res.json({ packet, path: pathHops, breakdown }); + // Include sibling observations for this transmission + const transmission = packet.hash ? pktStore.byTransmission.get(packet.hash) : null; + const siblingObservations = transmission ? transmission.observations : []; + const observation_count = transmission ? transmission.observation_count : 1; + + res.json({ packet, path: pathHops, breakdown, observation_count, observations: siblingObservations }); }); function buildBreakdown(rawHex, decoded) { @@ -966,10 +985,12 @@ app.get('/api/nodes/bulk-health', (req, res) => { const results = []; for (const node of nodes) { const packets = pktStore.byNode.get(node.public_key) || []; - let totalPackets = packets.length, packetsToday = 0, snrSum = 0, snrCount = 0, lastHeard = null; + let packetsToday = 0, snrSum = 0, snrCount = 0, lastHeard = null; const observers = {}; + let totalObservations = 0; for (const pkt of packets) { + totalObservations += pkt.observation_count || 1; if (pkt.timestamp > todayISO) packetsToday++; if (pkt.snr != null) { snrSum += pkt.snr; snrCount++; } if (!lastHeard || pkt.timestamp > lastHeard) lastHeard = pkt.timestamp; @@ -996,7 +1017,12 @@ app.get('/api/nodes/bulk-health', (req, res) => { results.push({ public_key: node.public_key, name: node.name, role: node.role, lat: node.lat, lon: node.lon, - stats: { totalPackets, packetsToday, avgSnr: snrCount ? snrSum / snrCount : null, lastHeard }, + stats: { + totalTransmissions: packets.length, + totalObservations, + totalPackets: packets.length, // backward compat + packetsToday, avgSnr: snrCount ? snrSum / snrCount : null, lastHeard + }, observers: observerRows }); } @@ -1864,10 +1890,22 @@ app.get('/api/nodes/:pubkey/health', (req, res) => { const recentPackets = packets.slice(0, 20); + // Count transmissions vs observations + const counts = pktStore.countForNode(pubkey); + const recentWithoutObs = recentPackets.map(p => { + const { observations, ...rest } = p; + return { ...rest, observation_count: p.observation_count || 1 }; + }); + const result = { node: node.node || node, observers, - stats: { totalPackets: packets.length, packetsToday, avgSnr: snrN ? snrSum / snrN : null, avgHops: hopCount > 0 ? Math.round(totalHops / hopCount) : 0, lastHeard }, - recentPackets + stats: { + totalTransmissions: counts.transmissions, + totalObservations: counts.observations, + totalPackets: counts.transmissions, // backward compat + packetsToday, avgSnr: snrN ? snrSum / snrN : null, avgHops: hopCount > 0 ? Math.round(totalHops / hopCount) : 0, lastHeard + }, + recentPackets: recentWithoutObs }; cache.set(_ck, result, TTL.nodeHealth); res.json(result); From a882aae6810384ccc6642eabc2bb169fac65c059 Mon Sep 17 00:00:00 2001 From: you Date: Fri, 20 Mar 2026 21:31:10 +0000 Subject: [PATCH 6/9] =?UTF-8?q?M5:=20Frontend=20updates=20for=20dedup=20?= =?UTF-8?q?=E2=80=94=20observation=5Fcount=20badges,=20totalTransmissions?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - packets.js: Show observation_count badge (👁 N) on grouped rows - nodes.js: Use totalTransmissions (fallback totalPackets), show observation badges on recent packets - home.js: Use totalTransmissions for network stats - node-analytics.js: Use totalTransmissions for throughput display - analytics.js: Use totalTransmissions for overview stats and node rankings - live.js: Use totalTransmissions in node detail, show observation badges in feed and recent packets - style.css: Add .badge-obs style for observation count badges - index.html: Bump cache busters on all changed JS/CSS files All changes have backward compat fallbacks to totalPackets. --- public/analytics.js | 12 ++++++------ public/home.js | 2 +- public/index.html | 14 +++++++------- public/live.js | 7 ++++--- public/node-analytics.js | 2 +- public/nodes.js | 8 +++++--- public/packets.js | 3 ++- public/style.css | 5 +++++ 8 files changed, 31 insertions(+), 22 deletions(-) diff --git a/public/analytics.js b/public/analytics.js index f0440bd..d1c1bee 100644 --- a/public/analytics.js +++ b/public/analytics.js @@ -150,13 +150,13 @@ el.innerHTML = `
-
${(rf.totalAllPackets || rf.totalPackets).toLocaleString()}
-
Total Packets
+
${(rf.totalTransmissions || rf.totalAllPackets || rf.totalPackets).toLocaleString()}
+
Total Transmissions
${sparkSvg(rf.packetsPerHour.map(h=>h.count), 'var(--accent)')}
${rf.totalPackets.toLocaleString()}
-
With Signal Data
+
Observations with Signal
${topo.uniqueNodes}
@@ -1167,7 +1167,7 @@ const enriched = nodes.filter(n => healthMap[n.public_key]).map(n => ({ ...n, health: { stats: healthMap[n.public_key].stats, observers: healthMap[n.public_key].observers } })); // Compute rankings - const byPackets = [...enriched].sort((a, b) => (b.health.stats.totalPackets || 0) - (a.health.stats.totalPackets || 0)); + const byPackets = [...enriched].sort((a, b) => (b.health.stats.totalTransmissions || b.health.stats.totalPackets || 0) - (a.health.stats.totalTransmissions || a.health.stats.totalPackets || 0)); const bySnr = [...enriched].filter(n => n.health.stats.avgSnr != null).sort((a, b) => b.health.stats.avgSnr - a.health.stats.avgSnr); const byObservers = [...enriched].sort((a, b) => (b.health.observers?.length || 0) - (a.health.observers?.length || 0)); const byRecent = [...enriched].filter(n => n.health.stats.lastHeard).sort((a, b) => new Date(b.health.stats.lastHeard) - new Date(a.health.stats.lastHeard)); @@ -1223,7 +1223,7 @@ return ` ${nodeLink(n)} ${n.role} - ${s.totalPackets || 0} + ${s.totalTransmissions || s.totalPackets || 0} ${s.avgSnr != null ? s.avgSnr.toFixed(1) + ' dB' : '—'} ${n.health.observers?.length || 0} ${s.lastHeard ? timeAgo(s.lastHeard) : '—'} @@ -1240,7 +1240,7 @@ ${i + 1} ${nodeLink(n)}${claimedBadge(n)} ${n.role} - ${n.health.stats.totalPackets || 0} + ${n.health.stats.totalTransmissions || n.health.stats.totalPackets || 0} ${n.health.stats.packetsToday || 0} 📊 `).join('')} diff --git a/public/home.js b/public/home.js index e75fa5e..b07fb35 100644 --- a/public/home.js +++ b/public/home.js @@ -373,7 +373,7 @@ const el = document.getElementById('homeStats'); if (!el) return; el.innerHTML = ` -
${s.totalPackets ?? '—'}
Packets
+
${s.totalTransmissions ?? s.totalPackets ?? '—'}
Transmissions
${s.totalNodes ?? '—'}
Nodes
${s.totalObservers ?? '—'}
Observers
${s.packetsLast24h ?? '—'}
Last 24h
diff --git a/public/index.html b/public/index.html index c64cf4a..34d403e 100644 --- a/public/index.html +++ b/public/index.html @@ -22,7 +22,7 @@ - + - - + + - + - - + + - + diff --git a/public/live.js b/public/live.js index 4f0e72f..8a5eb91 100644 --- a/public/live.js +++ b/public/live.js @@ -1020,7 +1020,7 @@ ${hasLoc ? `Location${n.lat.toFixed(5)}, ${n.lon.toFixed(5)}` : ''} ${stats.avgSnr != null ? `Avg SNR${stats.avgSnr.toFixed(1)} dB` : ''} ${stats.avgHops != null ? `Avg Hops${stats.avgHops.toFixed(1)}` : ''} - ${stats.totalPackets ? `Total Packets${stats.totalPackets}` : ''} + ${stats.totalTransmissions || stats.totalPackets ? `Total Packets${stats.totalTransmissions || stats.totalPackets}` : ''} `; if (observers.length) { @@ -1034,7 +1034,7 @@ html += `

Recent Packets

` + recent.slice(0, 10).map(p => `
- ${escapeHtml(p.payload_type || '?')} + ${escapeHtml(p.payload_type || '?')}${p.observation_count > 1 ? ' 👁 ' + p.observation_count + '' : ''} ${p.timestamp ? timeAgo(p.timestamp) : '—'}
`).join('') + '
'; @@ -1474,6 +1474,7 @@ const text = payload.text || payload.name || ''; const preview = text ? ' ' + (text.length > 35 ? text.slice(0, 35) + '…' : text) : ''; const hopStr = hops.length ? `${hops.length}⇢` : ''; + const obsBadge = pkt.observation_count > 1 ? `👁 ${pkt.observation_count}` : ''; const item = document.createElement('div'); item.className = 'live-feed-item live-feed-enter'; @@ -1483,7 +1484,7 @@ item.innerHTML = ` ${icon} ${typeName} - ${hopStr} + ${hopStr}${obsBadge} ${escapeHtml(preview)} ${new Date(pkt._ts || Date.now()).toLocaleTimeString([], {hour:'2-digit',minute:'2-digit',second:'2-digit'})} `; diff --git a/public/node-analytics.js b/public/node-analytics.js index 20d9a8e..db87bc2 100644 --- a/public/node-analytics.js +++ b/public/node-analytics.js @@ -55,7 +55,7 @@
← Back to ${nodeName}

📊 ${nodeName} — Analytics

-
${n.role || 'Unknown role'} · ${s.totalPackets} packets in ${days}d window
+
${n.role || 'Unknown role'} · ${s.totalTransmissions || s.totalPackets} packets in ${days}d window
diff --git a/public/nodes.js b/public/nodes.js index 11c07b3..a9f2621 100644 --- a/public/nodes.js +++ b/public/nodes.js @@ -128,7 +128,7 @@
Last Heard
${lastHeard ? timeAgo(lastHeard) : (n.last_seen ? timeAgo(n.last_seen) : '—')}
First Seen
${n.first_seen ? new Date(n.first_seen).toLocaleString() : '—'}
-
Total Packets
${stats.totalPackets || n.advert_count || 0}
+
Total Packets
${stats.totalTransmissions || stats.totalPackets || n.advert_count || 0}${stats.totalObservations && stats.totalObservations !== (stats.totalTransmissions || stats.totalPackets) ? ' (seen ' + stats.totalObservations + '×)' : ''}
Packets Today
${stats.packetsToday || 0}
${stats.avgSnr != null ? `
Avg SNR
${stats.avgSnr.toFixed(1)} dB
` : ''} ${stats.avgHops ? `
Avg Hops
${stats.avgHops}
` : ''} @@ -161,9 +161,10 @@ const obs = p.observer_name || p.observer_id; const snr = p.snr != null ? ` · SNR ${p.snr}dB` : ''; const rssi = p.rssi != null ? ` · RSSI ${p.rssi}dBm` : ''; + const obsBadge = p.observation_count > 1 ? ` 👁 ${p.observation_count}` : ''; return `
${timeAgo(p.timestamp)} - ${typeLabel}${detail}${obs ? ' via ' + escapeHtml(obs) : ''}${snr}${rssi} + ${typeLabel}${detail}${obsBadge}${obs ? ' via ' + escapeHtml(obs) : ''}${snr}${rssi} Analyze →
`; }).join('') : '
No recent packets
'} @@ -426,7 +427,7 @@ const role = (n.role || '').toLowerCase(); const { degradedMs, silentMs } = getHealthThresholds(role); const statusLabel = statusAge < degradedMs ? '🟢 Active' : statusAge < silentMs ? '🟡 Degraded' : '🔴 Silent'; - const totalPackets = stats.totalPackets || n.advert_count || 0; + const totalPackets = stats.totalTransmissions || stats.totalPackets || n.advert_count || 0; panel.innerHTML = `
@@ -480,6 +481,7 @@ `; From 95b59d17920561cac6e6a0a2e0e6195a06141518 Mon Sep 17 00:00:00 2001 From: you Date: Fri, 20 Mar 2026 22:34:00 +0000 Subject: [PATCH 9/9] fix: recent packets deeplink uses route path not query param --- public/index.html | 2 +- public/live.js | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/public/index.html b/public/index.html index 51ae3e6..b403298 100644 --- a/public/index.html +++ b/public/index.html @@ -88,7 +88,7 @@ - + diff --git a/public/live.js b/public/live.js index 30a24f3..7039d90 100644 --- a/public/live.js +++ b/public/live.js @@ -1034,7 +1034,7 @@ html += `

Recent Packets

';