diff --git a/db.js b/db.js index 9083b2f..3691a68 100644 --- a/db.js +++ b/db.js @@ -71,6 +71,41 @@ db.exec(` CREATE INDEX IF NOT EXISTS idx_packets_payload_type ON packets(payload_type); CREATE INDEX IF NOT EXISTS idx_nodes_last_seen ON nodes(last_seen); CREATE INDEX IF NOT EXISTS idx_observers_last_seen ON observers(last_seen); + + CREATE TABLE IF NOT EXISTS transmissions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + raw_hex TEXT NOT NULL, + hash TEXT NOT NULL UNIQUE, + first_seen TEXT NOT NULL, + route_type INTEGER, + payload_type INTEGER, + payload_version INTEGER, + decoded_json TEXT, + created_at TEXT DEFAULT (datetime('now')) + ); + + CREATE TABLE IF NOT EXISTS observations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + transmission_id INTEGER NOT NULL REFERENCES transmissions(id), + hash TEXT NOT NULL, + observer_id TEXT, + observer_name TEXT, + direction TEXT, + snr REAL, + rssi REAL, + score INTEGER, + path_json TEXT, + timestamp TEXT NOT NULL, + created_at TEXT DEFAULT (datetime('now')) + ); + + CREATE INDEX IF NOT EXISTS idx_transmissions_hash ON transmissions(hash); + CREATE INDEX IF NOT EXISTS idx_transmissions_first_seen ON transmissions(first_seen); + CREATE INDEX IF NOT EXISTS idx_transmissions_payload_type ON transmissions(payload_type); + CREATE INDEX IF NOT EXISTS idx_observations_hash ON observations(hash); + CREATE INDEX IF NOT EXISTS idx_observations_transmission_id ON observations(transmission_id); + CREATE INDEX IF NOT EXISTS idx_observations_observer_id ON observations(observer_id); + CREATE INDEX IF NOT EXISTS idx_observations_timestamp ON observations(timestamp); `); // --- Migrations for existing DBs --- @@ -144,6 +179,16 @@ const stmts = { countNodes: db.prepare(`SELECT COUNT(*) as count FROM nodes`), countObservers: db.prepare(`SELECT COUNT(*) as count FROM observers`), countRecentPackets: db.prepare(`SELECT COUNT(*) as count FROM packets WHERE timestamp > ?`), + getTransmissionByHash: db.prepare(`SELECT id, first_seen FROM transmissions WHERE hash = ?`), + insertTransmission: db.prepare(` + INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, payload_version, decoded_json) + VALUES (@raw_hex, @hash, @first_seen, @route_type, @payload_type, @payload_version, @decoded_json) + `), + updateTransmissionFirstSeen: db.prepare(`UPDATE transmissions SET first_seen = @first_seen WHERE id = @id`), + insertObservation: db.prepare(` + INSERT INTO observations (transmission_id, hash, observer_id, observer_name, direction, snr, rssi, score, path_json, timestamp) + VALUES (@transmission_id, @hash, @observer_id, @observer_name, @direction, @snr, @rssi, @score, @path_json, @timestamp) + `), }; // --- Helper functions --- @@ -168,6 +213,49 @@ function insertPacket(data) { return stmts.insertPacket.run(d).lastInsertRowid; } +function insertTransmission(data) { + const hash = data.hash; + if (!hash) return null; // Can't deduplicate without a hash + + const timestamp = data.timestamp || new Date().toISOString(); + let transmissionId; + + const existing = stmts.getTransmissionByHash.get(hash); + if (existing) { + transmissionId = existing.id; + // Update first_seen if this observation is earlier + if (timestamp < existing.first_seen) { + stmts.updateTransmissionFirstSeen.run({ id: transmissionId, first_seen: timestamp }); + } + } else { + const result = stmts.insertTransmission.run({ + raw_hex: data.raw_hex || '', + hash, + first_seen: timestamp, + route_type: data.route_type ?? null, + payload_type: data.payload_type ?? null, + payload_version: data.payload_version ?? null, + decoded_json: data.decoded_json || null, + }); + transmissionId = result.lastInsertRowid; + } + + const obsResult = stmts.insertObservation.run({ + transmission_id: transmissionId, + hash, + observer_id: data.observer_id || null, + observer_name: data.observer_name || null, + direction: data.direction || null, + snr: data.snr ?? null, + rssi: data.rssi ?? null, + score: data.score ?? null, + path_json: data.path_json || null, + timestamp, + }); + + return { transmissionId, observationId: obsResult.lastInsertRowid }; +} + function insertPath(packetId, hops) { const tx = db.transaction((hops) => { for (let i = 0; i < hops.length; i++) { @@ -276,8 +364,15 @@ function getObservers() { function getStats() { const oneHourAgo = new Date(Date.now() - 3600000).toISOString(); + // Try to get transmission count from normalized schema + let totalTransmissions = null; + try { + totalTransmissions = db.prepare('SELECT COUNT(*) as count FROM transmissions').get().count; + } catch {} return { totalPackets: stmts.countPackets.get().count, + totalTransmissions, + totalObservations: stmts.countPackets.get().count, // legacy packets = observations totalNodes: stmts.countNodes.get().count, totalObservers: stmts.countObservers.get().count, packetsLastHour: stmts.countRecentPackets.get(oneHourAgo).count, @@ -557,4 +652,4 @@ function getNodeAnalytics(pubkey, days) { }; } -module.exports = { db, insertPacket, insertPath, upsertNode, upsertObserver, updateObserverStatus, getPackets, getPacket, getNodes, getNode, getObservers, getStats, seed, searchNodes, getNodeHealth, getNodeAnalytics }; +module.exports = { db, insertPacket, insertTransmission, insertPath, upsertNode, upsertObserver, updateObserverStatus, getPackets, getPacket, getNodes, getNode, getObservers, getStats, seed, searchNodes, getNodeHealth, getNodeAnalytics }; diff --git a/packet-store.js b/packet-store.js index 4757f8f..f32b231 100644 --- a/packet-store.js +++ b/packet-store.js @@ -1,8 +1,9 @@ 'use strict'; /** - * In-memory packet store — loads all packets from SQLite on startup, + * In-memory packet store — loads transmissions + observations from SQLite on startup, * serves reads from RAM, writes to both RAM + SQLite. + * M3: Restructured around transmissions (deduped by hash) with observations. * Caps memory at configurable limit (default 1GB). */ class PacketStore { @@ -16,16 +17,22 @@ class PacketStore { // SQLite-only mode: skip RAM loading, all reads go to DB this.sqliteOnly = process.env.NO_MEMORY_STORE === '1'; - // Core storage: array sorted by timestamp DESC (newest first) + // Primary storage: transmissions sorted by first_seen DESC (newest first) + // Each transmission looks like a packet for backward compat this.packets = []; + // Indexes - this.byId = new Map(); - this.byHash = new Map(); // hash → [packet, ...] - this.byObserver = new Map(); // observer_id → [packet, ...] - this.byNode = new Map(); // pubkey → [packet, ...] + this.byId = new Map(); // observation_id → observation object (backward compat for packet detail links) + this.byHash = new Map(); // hash → transmission object (1:1) + this.byObserver = new Map(); // observer_id → [observation objects] + this.byNode = new Map(); // pubkey → [transmission objects] (deduped) + this.byTransmission = new Map(); // hash → transmission object (same refs as byHash) + + // Track which hashes are indexed per node pubkey (avoid dupes in byNode) + this._nodeHashIndex = new Map(); // pubkey → Set this.loaded = false; - this.stats = { totalLoaded: 0, evicted: 0, inserts: 0, queries: 0 }; + this.stats = { totalLoaded: 0, totalObservations: 0, evicted: 0, inserts: 0, queries: 0 }; } /** Load all packets from SQLite into memory */ @@ -35,61 +42,310 @@ class PacketStore { this.loaded = true; return this; } + const t0 = Date.now(); + + // Check if normalized schema exists + const hasTransmissions = this.db.prepare( + "SELECT name FROM sqlite_master WHERE type='table' AND name='transmissions'" + ).get(); + + if (hasTransmissions) { + this._loadNormalized(); + } else { + this._loadLegacy(); + } + + this.stats.totalLoaded = this.packets.length; + this.loaded = true; + const elapsed = Date.now() - t0; + console.log(`[PacketStore] Loaded ${this.packets.length} transmissions (${this.stats.totalObservations} observations) in ${elapsed}ms (${Math.round(this.packets.length * this.estPacketBytes / 1024 / 1024)}MB est)`); + return this; + } + + /** Load from normalized transmissions + observations tables */ + _loadNormalized() { + const rows = this.db.prepare(` + SELECT t.id AS transmission_id, t.raw_hex, t.hash, t.first_seen, t.route_type, + t.payload_type, t.payload_version, t.decoded_json, + o.id AS observation_id, o.observer_id, o.observer_name, o.direction, + o.snr, o.rssi, o.score, o.path_json, o.timestamp AS obs_timestamp + FROM transmissions t + LEFT JOIN observations o ON o.transmission_id = t.id + ORDER BY t.first_seen DESC, o.timestamp DESC + `).all(); + + for (const row of rows) { + if (this.packets.length >= this.maxPackets && !this.byTransmission.has(row.hash)) break; + + let tx = this.byTransmission.get(row.hash); + if (!tx) { + tx = { + id: row.transmission_id, + raw_hex: row.raw_hex, + hash: row.hash, + first_seen: row.first_seen, + timestamp: row.first_seen, + route_type: row.route_type, + payload_type: row.payload_type, + decoded_json: row.decoded_json, + observations: [], + observation_count: 0, + // Filled from first observation for backward compat + observer_id: null, + observer_name: null, + snr: null, + rssi: null, + path_json: null, + direction: null, + }; + this.byTransmission.set(row.hash, tx); + this.byHash.set(row.hash, tx); + this.packets.push(tx); + this._indexByNode(tx); + } + + if (row.observation_id != null) { + const obs = { + id: row.observation_id, + observer_id: row.observer_id, + observer_name: row.observer_name, + direction: row.direction, + snr: row.snr, + rssi: row.rssi, + score: row.score, + path_json: row.path_json, + timestamp: row.obs_timestamp, + // Carry transmission fields for backward compat + hash: row.hash, + raw_hex: row.raw_hex, + payload_type: row.payload_type, + decoded_json: row.decoded_json, + route_type: row.route_type, + }; + + tx.observations.push(obs); + tx.observation_count++; + + // Fill first observation data into transmission for backward compat + if (tx.observer_id == null && obs.observer_id) { + tx.observer_id = obs.observer_id; + tx.observer_name = obs.observer_name; + tx.snr = obs.snr; + tx.rssi = obs.rssi; + tx.path_json = obs.path_json; + tx.direction = obs.direction; + } + + // byId maps observation IDs for packet detail links + this.byId.set(obs.id, obs); + + // byObserver + if (obs.observer_id) { + if (!this.byObserver.has(obs.observer_id)) this.byObserver.set(obs.observer_id, []); + this.byObserver.get(obs.observer_id).push(obs); + } + + this.stats.totalObservations++; + } + } + } + + /** Fallback: load from legacy packets table */ + _loadLegacy() { const rows = this.db.prepare( 'SELECT * FROM packets ORDER BY timestamp DESC' ).all(); for (const row of rows) { if (this.packets.length >= this.maxPackets) break; - this._index(row); - this.packets.push(row); + this._indexLegacy(row); } - - this.stats.totalLoaded = this.packets.length; - this.loaded = true; - const elapsed = Date.now() - t0; - console.log(`[PacketStore] Loaded ${this.packets.length} packets in ${elapsed}ms (${Math.round(this.packets.length * this.estPacketBytes / 1024 / 1024)}MB est)`); - return this; } - /** Index a packet into all lookup maps */ - _index(pkt) { - this.byId.set(pkt.id, pkt); - - if (pkt.hash) { - if (!this.byHash.has(pkt.hash)) this.byHash.set(pkt.hash, []); - this.byHash.get(pkt.hash).push(pkt); + /** Index a legacy packet row (old flat structure) — builds transmission + observation */ + _indexLegacy(pkt) { + let tx = this.byTransmission.get(pkt.hash); + if (!tx) { + tx = { + id: pkt.id, + raw_hex: pkt.raw_hex, + hash: pkt.hash, + first_seen: pkt.timestamp, + timestamp: pkt.timestamp, + route_type: pkt.route_type, + payload_type: pkt.payload_type, + decoded_json: pkt.decoded_json, + observations: [], + observation_count: 0, + observer_id: pkt.observer_id, + observer_name: pkt.observer_name, + snr: pkt.snr, + rssi: pkt.rssi, + path_json: pkt.path_json, + direction: pkt.direction, + }; + this.byTransmission.set(pkt.hash, tx); + this.byHash.set(pkt.hash, tx); + this.packets.push(tx); + this._indexByNode(tx); } + if (pkt.timestamp < tx.first_seen) { + tx.first_seen = pkt.timestamp; + tx.timestamp = pkt.timestamp; + } + + const obs = { + id: pkt.id, + observer_id: pkt.observer_id, + observer_name: pkt.observer_name, + direction: pkt.direction, + snr: pkt.snr, + rssi: pkt.rssi, + score: pkt.score, + path_json: pkt.path_json, + timestamp: pkt.timestamp, + hash: pkt.hash, + raw_hex: pkt.raw_hex, + payload_type: pkt.payload_type, + decoded_json: pkt.decoded_json, + route_type: pkt.route_type, + }; + tx.observations.push(obs); + tx.observation_count++; + + this.byId.set(pkt.id, obs); + if (pkt.observer_id) { if (!this.byObserver.has(pkt.observer_id)) this.byObserver.set(pkt.observer_id, []); - this.byObserver.get(pkt.observer_id).push(pkt); + this.byObserver.get(pkt.observer_id).push(obs); } - // Index by node pubkeys mentioned in decoded_json - this._indexByNode(pkt); + this.stats.totalObservations++; } - /** Extract node pubkeys/names from decoded_json and index */ - _indexByNode(pkt) { - if (!pkt.decoded_json) return; + /** Extract node pubkeys from decoded_json and index transmission in byNode */ + _indexByNode(tx) { + if (!tx.decoded_json) return; try { - const decoded = JSON.parse(pkt.decoded_json); + const decoded = JSON.parse(tx.decoded_json); const keys = new Set(); if (decoded.pubKey) keys.add(decoded.pubKey); if (decoded.destPubKey) keys.add(decoded.destPubKey); if (decoded.srcPubKey) keys.add(decoded.srcPubKey); for (const k of keys) { + if (!this._nodeHashIndex.has(k)) this._nodeHashIndex.set(k, new Set()); + if (this._nodeHashIndex.get(k).has(tx.hash)) continue; // already indexed + this._nodeHashIndex.get(k).add(tx.hash); if (!this.byNode.has(k)) this.byNode.set(k, []); - this.byNode.get(k).push(pkt); + this.byNode.get(k).push(tx); } } catch {} } + /** Remove oldest transmissions when over memory limit */ + _evict() { + while (this.packets.length > this.maxPackets) { + const old = this.packets.pop(); + this.byHash.delete(old.hash); + this.byTransmission.delete(old.hash); + // Remove observations from byId and byObserver + for (const obs of old.observations) { + this.byId.delete(obs.id); + if (obs.observer_id && this.byObserver.has(obs.observer_id)) { + const arr = this.byObserver.get(obs.observer_id).filter(o => o.id !== obs.id); + if (arr.length) this.byObserver.set(obs.observer_id, arr); else this.byObserver.delete(obs.observer_id); + } + } + // Skip node index cleanup (expensive, low value) + this.stats.evicted++; + } + } + + /** Insert a new packet (to both memory and SQLite) */ + insert(packetData) { + const id = this.dbModule.insertPacket(packetData); + const row = this.dbModule.getPacket(id); + if (row && !this.sqliteOnly) { + // Update or create transmission in memory + let tx = this.byTransmission.get(row.hash); + if (!tx) { + tx = { + id: row.id, + raw_hex: row.raw_hex, + hash: row.hash, + first_seen: row.timestamp, + timestamp: row.timestamp, + route_type: row.route_type, + payload_type: row.payload_type, + decoded_json: row.decoded_json, + observations: [], + observation_count: 0, + observer_id: row.observer_id, + observer_name: row.observer_name, + snr: row.snr, + rssi: row.rssi, + path_json: row.path_json, + direction: row.direction, + }; + this.byTransmission.set(row.hash, tx); + this.byHash.set(row.hash, tx); + this.packets.unshift(tx); // newest first + this._indexByNode(tx); + } else { + // Update first_seen if earlier + if (row.timestamp < tx.first_seen) { + tx.first_seen = row.timestamp; + tx.timestamp = row.timestamp; + } + } + + // Add observation + const obs = { + id: row.id, + observer_id: row.observer_id, + observer_name: row.observer_name, + direction: row.direction, + snr: row.snr, + rssi: row.rssi, + score: row.score, + path_json: row.path_json, + timestamp: row.timestamp, + hash: row.hash, + raw_hex: row.raw_hex, + payload_type: row.payload_type, + decoded_json: row.decoded_json, + route_type: row.route_type, + }; + tx.observations.push(obs); + tx.observation_count++; + + // Update transmission's display fields if this is first observation + if (tx.observations.length === 1) { + tx.observer_id = obs.observer_id; + tx.observer_name = obs.observer_name; + tx.snr = obs.snr; + tx.rssi = obs.rssi; + tx.path_json = obs.path_json; + } + + this.byId.set(obs.id, obs); + if (obs.observer_id) { + if (!this.byObserver.has(obs.observer_id)) this.byObserver.set(obs.observer_id, []); + this.byObserver.get(obs.observer_id).push(obs); + } + + this.stats.totalObservations++; + this._evict(); + this.stats.inserts++; + } + return id; + } + /** * Find ALL packets referencing a node — by pubkey index + name + pubkey text search. - * Single source of truth for "get packets for node X". + * Returns unique transmissions (deduped). * @param {string} nodeIdOrName - pubkey or friendly name * @param {Array} [fromPackets] - packet array to filter (defaults to this.packets) * @returns {{ packets: Array, pubkey: string, nodeName: string }} @@ -104,49 +360,24 @@ class PacketStore { if (row) { pubkey = row.public_key; nodeName = row.name || nodeIdOrName; } } catch {} - // Combine: index hits + text search by both name and pubkey + // Combine: index hits + text search const indexed = this.byNode.get(pubkey); - const idSet = indexed ? new Set(indexed.map(p => p.id)) : new Set(); + const hashSet = indexed ? new Set(indexed.map(t => t.hash)) : new Set(); const source = fromPackets || this.packets; - const packets = source.filter(p => - idSet.has(p.id) || - (p.decoded_json && (p.decoded_json.includes(nodeName) || p.decoded_json.includes(pubkey))) + const packets = source.filter(t => + hashSet.has(t.hash) || + (t.decoded_json && (t.decoded_json.includes(nodeName) || t.decoded_json.includes(pubkey))) ); return { packets, pubkey, nodeName }; } - /** Remove oldest packets when over memory limit */ - _evict() { - while (this.packets.length > this.maxPackets) { - const old = this.packets.pop(); - this.byId.delete(old.id); - // Remove from hash index - if (old.hash && this.byHash.has(old.hash)) { - const arr = this.byHash.get(old.hash).filter(p => p.id !== old.id); - if (arr.length) this.byHash.set(old.hash, arr); else this.byHash.delete(old.hash); - } - // Remove from observer index - if (old.observer_id && this.byObserver.has(old.observer_id)) { - const arr = this.byObserver.get(old.observer_id).filter(p => p.id !== old.id); - if (arr.length) this.byObserver.set(old.observer_id, arr); else this.byObserver.delete(old.observer_id); - } - // Skip node index cleanup for eviction (expensive, low value) - this.stats.evicted++; - } - } - - /** Insert a new packet (to both memory and SQLite) */ - insert(packetData) { - const id = this.dbModule.insertPacket(packetData); - const row = this.dbModule.getPacket(id); - if (row) { - this.packets.unshift(row); // newest first - this._index(row); - this._evict(); - this.stats.inserts++; - } - return id; + /** Count transmissions and observations for a node */ + countForNode(pubkey) { + const txs = this.byNode.get(pubkey) || []; + let observations = 0; + for (const tx of txs) observations += tx.observation_count; + return { transmissions: txs.length, observations }; } /** Query packets with filters — all from memory (or SQLite in fallback mode) */ @@ -159,9 +390,11 @@ class PacketStore { // Use indexes for single-key filters when possible if (hash && !type && !route && !region && !observer && !since && !until && !node) { - results = this.byHash.get(hash) || []; + const tx = this.byHash.get(hash); + results = tx ? [tx] : []; } else if (observer && !type && !route && !region && !hash && !since && !until && !node) { - results = this.byObserver.get(observer) || []; + // For observer filter, find unique transmissions where any observation matches + results = this._transmissionsForObserver(observer); } else if (node && !type && !route && !region && !observer && !hash && !since && !until) { results = this.findPacketsForNode(node).packets; } else { @@ -174,18 +407,22 @@ class PacketStore { const r = Number(route); results = results.filter(p => p.route_type === r); } - if (observer) results = results.filter(p => p.observer_id === observer); - if (hash) results = results.filter(p => p.hash === hash); + if (observer) results = this._transmissionsForObserver(observer, results); + if (hash) { + const tx = this.byHash.get(hash); + results = tx ? results.filter(p => p.hash === hash) : []; + } if (since) results = results.filter(p => p.timestamp > since); if (until) results = results.filter(p => p.timestamp < until); if (region) { - // Need to look up observers for this region const regionObservers = new Set(); try { const obs = this.db.prepare('SELECT id FROM observers WHERE iata = ?').all(region); obs.forEach(o => regionObservers.add(o.id)); } catch {} - results = results.filter(p => regionObservers.has(p.observer_id)); + results = results.filter(p => + p.observations.some(o => regionObservers.has(o.observer_id)) + ); } if (node) { results = this.findPacketsForNode(node, results).packets; @@ -209,52 +446,52 @@ class PacketStore { return { packets: paginated, total }; } - /** Query with groupByHash — aggregate packets by content hash */ + /** Find unique transmissions that have at least one observation from given observer */ + _transmissionsForObserver(observerId, fromTransmissions) { + if (fromTransmissions) { + return fromTransmissions.filter(tx => + tx.observations.some(o => o.observer_id === observerId) + ); + } + // Use byObserver index: get observations, then unique transmissions + const obs = this.byObserver.get(observerId) || []; + const seen = new Set(); + const result = []; + for (const o of obs) { + if (!seen.has(o.hash)) { + seen.add(o.hash); + const tx = this.byTransmission.get(o.hash); + if (tx) result.push(tx); + } + } + return result; + } + + /** Query with groupByHash — now trivial since packets ARE transmissions */ queryGrouped({ limit = 50, offset = 0, type, route, region, observer, hash, since, until, node } = {}) { this.stats.queries++; if (this.sqliteOnly) return this._queryGroupedSQLite({ limit, offset, type, route, region, observer, hash, since, until, node }); - // Get filtered results first + // Get filtered transmissions const { packets: filtered, total: filteredTotal } = this.query({ limit: 999999, offset: 0, type, route, region, observer, hash, since, until, node }); - // Group by hash - const groups = new Map(); - for (const p of filtered) { - const h = p.hash || p.id.toString(); - if (!groups.has(h)) { - groups.set(h, { - hash: p.hash, - observer_count: new Set(), - count: 0, - latest: p.timestamp, - observer_id: p.observer_id, - observer_name: p.observer_name, - path_json: p.path_json, - payload_type: p.payload_type, - raw_hex: p.raw_hex, - decoded_json: p.decoded_json, - }); - } - const g = groups.get(h); - g.count++; - if (p.observer_id) g.observer_count.add(p.observer_id); - if (p.timestamp > g.latest) { - g.latest = p.timestamp; - } - // Keep longest path - if (p.path_json && (!g.path_json || p.path_json.length > g.path_json.length)) { - g.path_json = p.path_json; - g.raw_hex = p.raw_hex; - } - } - - // Sort by latest DESC, paginate - const sorted = [...groups.values()] - .map(g => ({ ...g, observer_count: g.observer_count.size })) - .sort((a, b) => b.latest.localeCompare(a.latest)); + // Already grouped by hash — just format for backward compat + const sorted = filtered.map(tx => ({ + hash: tx.hash, + count: tx.observation_count, + observer_count: new Set(tx.observations.map(o => o.observer_id).filter(Boolean)).size, + latest: tx.observations.length ? tx.observations.reduce((max, o) => o.timestamp > max ? o.timestamp : max, tx.observations[0].timestamp) : tx.timestamp, + observer_id: tx.observer_id, + observer_name: tx.observer_name, + path_json: tx.path_json, + payload_type: tx.payload_type, + raw_hex: tx.raw_hex, + decoded_json: tx.decoded_json, + observation_count: tx.observation_count, + })).sort((a, b) => b.latest.localeCompare(a.latest)); const total = sorted.length; const paginated = sorted.slice(Number(offset), Number(offset) + Number(limit)); @@ -274,25 +511,26 @@ class PacketStore { return results.reverse(); } - /** Get a single packet by ID */ + /** Get a single packet by ID — checks observation IDs first (backward compat) */ getById(id) { if (this.sqliteOnly) return this.db.prepare('SELECT * FROM packets WHERE id = ?').get(id) || null; return this.byId.get(id) || null; } - /** Get all siblings of a packet (same hash) */ + /** Get all siblings of a packet (same hash) — returns observations array */ getSiblings(hash) { if (this.sqliteOnly) return this.db.prepare('SELECT * FROM packets WHERE hash = ? ORDER BY timestamp DESC').all(hash); - return this.byHash.get(hash) || []; + const tx = this.byTransmission.get(hash); + return tx ? tx.observations : []; } - /** Get all packets (raw array reference — do not mutate) */ + /** Get all transmissions (backward compat — returns packets array) */ all() { if (this.sqliteOnly) return this.db.prepare('SELECT * FROM packets ORDER BY timestamp DESC').all(); return this.packets; } - /** Get all packets matching a filter function */ + /** Get all transmissions matching a filter function */ filter(fn) { if (this.sqliteOnly) return this.db.prepare('SELECT * FROM packets ORDER BY timestamp DESC').all().filter(fn); return this.packets.filter(fn); @@ -311,6 +549,7 @@ class PacketStore { byHash: this.byHash.size, byObserver: this.byObserver.size, byNode: this.byNode.size, + byTransmission: this.byTransmission.size, } }; } diff --git a/public/analytics.js b/public/analytics.js index f0440bd..d1c1bee 100644 --- a/public/analytics.js +++ b/public/analytics.js @@ -150,13 +150,13 @@ el.innerHTML = `
-
${(rf.totalAllPackets || rf.totalPackets).toLocaleString()}
-
Total Packets
+
${(rf.totalTransmissions || rf.totalAllPackets || rf.totalPackets).toLocaleString()}
+
Total Transmissions
${sparkSvg(rf.packetsPerHour.map(h=>h.count), 'var(--accent)')}
${rf.totalPackets.toLocaleString()}
-
With Signal Data
+
Observations with Signal
${topo.uniqueNodes}
@@ -1167,7 +1167,7 @@ const enriched = nodes.filter(n => healthMap[n.public_key]).map(n => ({ ...n, health: { stats: healthMap[n.public_key].stats, observers: healthMap[n.public_key].observers } })); // Compute rankings - const byPackets = [...enriched].sort((a, b) => (b.health.stats.totalPackets || 0) - (a.health.stats.totalPackets || 0)); + const byPackets = [...enriched].sort((a, b) => (b.health.stats.totalTransmissions || b.health.stats.totalPackets || 0) - (a.health.stats.totalTransmissions || a.health.stats.totalPackets || 0)); const bySnr = [...enriched].filter(n => n.health.stats.avgSnr != null).sort((a, b) => b.health.stats.avgSnr - a.health.stats.avgSnr); const byObservers = [...enriched].sort((a, b) => (b.health.observers?.length || 0) - (a.health.observers?.length || 0)); const byRecent = [...enriched].filter(n => n.health.stats.lastHeard).sort((a, b) => new Date(b.health.stats.lastHeard) - new Date(a.health.stats.lastHeard)); @@ -1223,7 +1223,7 @@ return ` ${nodeLink(n)} ${n.role} - ${s.totalPackets || 0} + ${s.totalTransmissions || s.totalPackets || 0} ${s.avgSnr != null ? s.avgSnr.toFixed(1) + ' dB' : '—'} ${n.health.observers?.length || 0} ${s.lastHeard ? timeAgo(s.lastHeard) : '—'} @@ -1240,7 +1240,7 @@ ${i + 1} ${nodeLink(n)}${claimedBadge(n)} ${n.role} - ${n.health.stats.totalPackets || 0} + ${n.health.stats.totalTransmissions || n.health.stats.totalPackets || 0} ${n.health.stats.packetsToday || 0} 📊 `).join('')} diff --git a/public/home.js b/public/home.js index e75fa5e..b07fb35 100644 --- a/public/home.js +++ b/public/home.js @@ -373,7 +373,7 @@ const el = document.getElementById('homeStats'); if (!el) return; el.innerHTML = ` -
${s.totalPackets ?? '—'}
Packets
+
${s.totalTransmissions ?? s.totalPackets ?? '—'}
Transmissions
${s.totalNodes ?? '—'}
Nodes
${s.totalObservers ?? '—'}
Observers
${s.packetsLast24h ?? '—'}
Last 24h
diff --git a/public/index.html b/public/index.html index c64cf4a..b403298 100644 --- a/public/index.html +++ b/public/index.html @@ -22,7 +22,7 @@ - + - - + + - + - - + + - + diff --git a/public/live.js b/public/live.js index 4f0e72f..7039d90 100644 --- a/public/live.js +++ b/public/live.js @@ -1020,13 +1020,13 @@ ${hasLoc ? `Location${n.lat.toFixed(5)}, ${n.lon.toFixed(5)}` : ''} ${stats.avgSnr != null ? `Avg SNR${stats.avgSnr.toFixed(1)} dB` : ''} ${stats.avgHops != null ? `Avg Hops${stats.avgHops.toFixed(1)}` : ''} - ${stats.totalPackets ? `Total Packets${stats.totalPackets}` : ''} + ${stats.totalTransmissions || stats.totalPackets ? `Total Packets${stats.totalTransmissions || stats.totalPackets}` : ''} `; if (observers.length) { html += `

Heard By

` + - observers.map(o => `
${escapeHtml(o.observer_name || o.observer_id.slice(0, 12))} — ${o.count} pkts
`).join('') + + observers.map(o => `
${escapeHtml(o.observer_name || o.observer_id.slice(0, 12))} — ${o.packetCount || o.count || 0} pkts
`).join('') + '
'; } @@ -1034,14 +1034,14 @@ html += `

Recent Packets

` + recent.slice(0, 10).map(p => `
- ${escapeHtml(p.payload_type || '?')} + ${escapeHtml(p.payload_type || '?')}${p.observation_count > 1 ? ' 👁 ' + p.observation_count + '' : ''} ${p.timestamp ? timeAgo(p.timestamp) : '—'}
`).join('') + '
'; } html += `
- Full Detail → + Full Detail → 📊 Analytics
`; @@ -1474,6 +1474,7 @@ const text = payload.text || payload.name || ''; const preview = text ? ' ' + (text.length > 35 ? text.slice(0, 35) + '…' : text) : ''; const hopStr = hops.length ? `${hops.length}⇢` : ''; + const obsBadge = pkt.observation_count > 1 ? `👁 ${pkt.observation_count}` : ''; const item = document.createElement('div'); item.className = 'live-feed-item live-feed-enter'; @@ -1483,7 +1484,7 @@ item.innerHTML = ` ${icon} ${typeName} - ${hopStr} + ${hopStr}${obsBadge} ${escapeHtml(preview)} ${new Date(pkt._ts || Date.now()).toLocaleTimeString([], {hour:'2-digit',minute:'2-digit',second:'2-digit'})} `; diff --git a/public/node-analytics.js b/public/node-analytics.js index 20d9a8e..db87bc2 100644 --- a/public/node-analytics.js +++ b/public/node-analytics.js @@ -55,7 +55,7 @@
← Back to ${nodeName}

📊 ${nodeName} — Analytics

-
${n.role || 'Unknown role'} · ${s.totalPackets} packets in ${days}d window
+
${n.role || 'Unknown role'} · ${s.totalTransmissions || s.totalPackets} packets in ${days}d window
diff --git a/public/nodes.js b/public/nodes.js index 11c07b3..a9f2621 100644 --- a/public/nodes.js +++ b/public/nodes.js @@ -128,7 +128,7 @@
Last Heard
${lastHeard ? timeAgo(lastHeard) : (n.last_seen ? timeAgo(n.last_seen) : '—')}
First Seen
${n.first_seen ? new Date(n.first_seen).toLocaleString() : '—'}
-
Total Packets
${stats.totalPackets || n.advert_count || 0}
+
Total Packets
${stats.totalTransmissions || stats.totalPackets || n.advert_count || 0}${stats.totalObservations && stats.totalObservations !== (stats.totalTransmissions || stats.totalPackets) ? ' (seen ' + stats.totalObservations + '×)' : ''}
Packets Today
${stats.packetsToday || 0}
${stats.avgSnr != null ? `
Avg SNR
${stats.avgSnr.toFixed(1)} dB
` : ''} ${stats.avgHops ? `
Avg Hops
${stats.avgHops}
` : ''} @@ -161,9 +161,10 @@ const obs = p.observer_name || p.observer_id; const snr = p.snr != null ? ` · SNR ${p.snr}dB` : ''; const rssi = p.rssi != null ? ` · RSSI ${p.rssi}dBm` : ''; + const obsBadge = p.observation_count > 1 ? ` 👁 ${p.observation_count}` : ''; return `
${timeAgo(p.timestamp)} - ${typeLabel}${detail}${obs ? ' via ' + escapeHtml(obs) : ''}${snr}${rssi} + ${typeLabel}${detail}${obsBadge}${obs ? ' via ' + escapeHtml(obs) : ''}${snr}${rssi} Analyze →
`; }).join('') : '
No recent packets
'} @@ -426,7 +427,7 @@ const role = (n.role || '').toLowerCase(); const { degradedMs, silentMs } = getHealthThresholds(role); const statusLabel = statusAge < degradedMs ? '🟢 Active' : statusAge < silentMs ? '🟡 Degraded' : '🔴 Silent'; - const totalPackets = stats.totalPackets || n.advert_count || 0; + const totalPackets = stats.totalTransmissions || stats.totalPackets || n.advert_count || 0; panel.innerHTML = `
@@ -480,6 +481,7 @@