diff --git a/packet-store.js b/packet-store.js index 82ba002a..f32b231f 100644 --- a/packet-store.js +++ b/packet-store.js @@ -1,8 +1,9 @@ 'use strict'; /** - * In-memory packet store — loads all packets from SQLite on startup, + * In-memory packet store — loads transmissions + observations from SQLite on startup, * serves reads from RAM, writes to both RAM + SQLite. + * M3: Restructured around transmissions (deduped by hash) with observations. * Caps memory at configurable limit (default 1GB). */ class PacketStore { @@ -16,17 +17,22 @@ class PacketStore { // SQLite-only mode: skip RAM loading, all reads go to DB this.sqliteOnly = process.env.NO_MEMORY_STORE === '1'; - // Core storage: array sorted by timestamp DESC (newest first) + // Primary storage: transmissions sorted by first_seen DESC (newest first) + // Each transmission looks like a packet for backward compat this.packets = []; + // Indexes - this.byId = new Map(); - this.byHash = new Map(); // hash → [packet, ...] - this.byObserver = new Map(); // observer_id → [packet, ...] - this.byNode = new Map(); // pubkey → [packet, ...] - this.byTransmission = new Map(); // hash → {id, hash, first_seen, payload_type, decoded_json, observations: []} + this.byId = new Map(); // observation_id → observation object (backward compat for packet detail links) + this.byHash = new Map(); // hash → transmission object (1:1) + this.byObserver = new Map(); // observer_id → [observation objects] + this.byNode = new Map(); // pubkey → [transmission objects] (deduped) + this.byTransmission = new Map(); // hash → transmission object (same refs as byHash) + + // Track which hashes are indexed per node pubkey (avoid dupes in byNode) + this._nodeHashIndex = new Map(); // pubkey → Set this.loaded = false; - this.stats = { totalLoaded: 0, evicted: 0, inserts: 0, queries: 0 }; + this.stats = { totalLoaded: 0, totalObservations: 0, evicted: 0, inserts: 0, queries: 0 }; } /** Load all packets from SQLite into memory */ @@ -36,88 +42,310 @@ class PacketStore { this.loaded = true; return this; } + const t0 = Date.now(); + + // Check if normalized schema exists + const hasTransmissions = this.db.prepare( + "SELECT name FROM sqlite_master WHERE type='table' AND name='transmissions'" + ).get(); + + if (hasTransmissions) { + this._loadNormalized(); + } else { + this._loadLegacy(); + } + + this.stats.totalLoaded = this.packets.length; + this.loaded = true; + const elapsed = Date.now() - t0; + console.log(`[PacketStore] Loaded ${this.packets.length} transmissions (${this.stats.totalObservations} observations) in ${elapsed}ms (${Math.round(this.packets.length * this.estPacketBytes / 1024 / 1024)}MB est)`); + return this; + } + + /** Load from normalized transmissions + observations tables */ + _loadNormalized() { + const rows = this.db.prepare(` + SELECT t.id AS transmission_id, t.raw_hex, t.hash, t.first_seen, t.route_type, + t.payload_type, t.payload_version, t.decoded_json, + o.id AS observation_id, o.observer_id, o.observer_name, o.direction, + o.snr, o.rssi, o.score, o.path_json, o.timestamp AS obs_timestamp + FROM transmissions t + LEFT JOIN observations o ON o.transmission_id = t.id + ORDER BY t.first_seen DESC, o.timestamp DESC + `).all(); + + for (const row of rows) { + if (this.packets.length >= this.maxPackets && !this.byTransmission.has(row.hash)) break; + + let tx = this.byTransmission.get(row.hash); + if (!tx) { + tx = { + id: row.transmission_id, + raw_hex: row.raw_hex, + hash: row.hash, + first_seen: row.first_seen, + timestamp: row.first_seen, + route_type: row.route_type, + payload_type: row.payload_type, + decoded_json: row.decoded_json, + observations: [], + observation_count: 0, + // Filled from first observation for backward compat + observer_id: null, + observer_name: null, + snr: null, + rssi: null, + path_json: null, + direction: null, + }; + this.byTransmission.set(row.hash, tx); + this.byHash.set(row.hash, tx); + this.packets.push(tx); + this._indexByNode(tx); + } + + if (row.observation_id != null) { + const obs = { + id: row.observation_id, + observer_id: row.observer_id, + observer_name: row.observer_name, + direction: row.direction, + snr: row.snr, + rssi: row.rssi, + score: row.score, + path_json: row.path_json, + timestamp: row.obs_timestamp, + // Carry transmission fields for backward compat + hash: row.hash, + raw_hex: row.raw_hex, + payload_type: row.payload_type, + decoded_json: row.decoded_json, + route_type: row.route_type, + }; + + tx.observations.push(obs); + tx.observation_count++; + + // Fill first observation data into transmission for backward compat + if (tx.observer_id == null && obs.observer_id) { + tx.observer_id = obs.observer_id; + tx.observer_name = obs.observer_name; + tx.snr = obs.snr; + tx.rssi = obs.rssi; + tx.path_json = obs.path_json; + tx.direction = obs.direction; + } + + // byId maps observation IDs for packet detail links + this.byId.set(obs.id, obs); + + // byObserver + if (obs.observer_id) { + if (!this.byObserver.has(obs.observer_id)) this.byObserver.set(obs.observer_id, []); + this.byObserver.get(obs.observer_id).push(obs); + } + + this.stats.totalObservations++; + } + } + } + + /** Fallback: load from legacy packets table */ + _loadLegacy() { const rows = this.db.prepare( 'SELECT * FROM packets ORDER BY timestamp DESC' ).all(); for (const row of rows) { if (this.packets.length >= this.maxPackets) break; - this._index(row); - this.packets.push(row); + this._indexLegacy(row); } - - this.stats.totalLoaded = this.packets.length; - this.loaded = true; - const elapsed = Date.now() - t0; - console.log(`[PacketStore] Loaded ${this.packets.length} packets in ${elapsed}ms (${Math.round(this.packets.length * this.estPacketBytes / 1024 / 1024)}MB est)`); - return this; } - /** Index a packet into all lookup maps */ - _index(pkt) { - this.byId.set(pkt.id, pkt); - - if (pkt.hash) { - if (!this.byHash.has(pkt.hash)) this.byHash.set(pkt.hash, []); - this.byHash.get(pkt.hash).push(pkt); + /** Index a legacy packet row (old flat structure) — builds transmission + observation */ + _indexLegacy(pkt) { + let tx = this.byTransmission.get(pkt.hash); + if (!tx) { + tx = { + id: pkt.id, + raw_hex: pkt.raw_hex, + hash: pkt.hash, + first_seen: pkt.timestamp, + timestamp: pkt.timestamp, + route_type: pkt.route_type, + payload_type: pkt.payload_type, + decoded_json: pkt.decoded_json, + observations: [], + observation_count: 0, + observer_id: pkt.observer_id, + observer_name: pkt.observer_name, + snr: pkt.snr, + rssi: pkt.rssi, + path_json: pkt.path_json, + direction: pkt.direction, + }; + this.byTransmission.set(pkt.hash, tx); + this.byHash.set(pkt.hash, tx); + this.packets.push(tx); + this._indexByNode(tx); } + if (pkt.timestamp < tx.first_seen) { + tx.first_seen = pkt.timestamp; + tx.timestamp = pkt.timestamp; + } + + const obs = { + id: pkt.id, + observer_id: pkt.observer_id, + observer_name: pkt.observer_name, + direction: pkt.direction, + snr: pkt.snr, + rssi: pkt.rssi, + score: pkt.score, + path_json: pkt.path_json, + timestamp: pkt.timestamp, + hash: pkt.hash, + raw_hex: pkt.raw_hex, + payload_type: pkt.payload_type, + decoded_json: pkt.decoded_json, + route_type: pkt.route_type, + }; + tx.observations.push(obs); + tx.observation_count++; + + this.byId.set(pkt.id, obs); + if (pkt.observer_id) { if (!this.byObserver.has(pkt.observer_id)) this.byObserver.set(pkt.observer_id, []); - this.byObserver.get(pkt.observer_id).push(pkt); + this.byObserver.get(pkt.observer_id).push(obs); } - // Index by node pubkeys mentioned in decoded_json - this._indexByNode(pkt); - - // Index by transmission (dedup view) - if (pkt.hash) { - if (!this.byTransmission.has(pkt.hash)) { - this.byTransmission.set(pkt.hash, { - id: pkt.id, - hash: pkt.hash, - first_seen: pkt.timestamp, - payload_type: pkt.payload_type, - decoded_json: pkt.decoded_json, - observations: [], - }); - } - const tx = this.byTransmission.get(pkt.hash); - if (pkt.timestamp < tx.first_seen) tx.first_seen = pkt.timestamp; - tx.observations.push({ - id: pkt.id, - observer_id: pkt.observer_id, - observer_name: pkt.observer_name, - direction: pkt.direction, - snr: pkt.snr, - rssi: pkt.rssi, - score: pkt.score, - path_json: pkt.path_json, - timestamp: pkt.timestamp, - }); - } + this.stats.totalObservations++; } - /** Extract node pubkeys/names from decoded_json and index */ - _indexByNode(pkt) { - if (!pkt.decoded_json) return; + /** Extract node pubkeys from decoded_json and index transmission in byNode */ + _indexByNode(tx) { + if (!tx.decoded_json) return; try { - const decoded = JSON.parse(pkt.decoded_json); + const decoded = JSON.parse(tx.decoded_json); const keys = new Set(); if (decoded.pubKey) keys.add(decoded.pubKey); if (decoded.destPubKey) keys.add(decoded.destPubKey); if (decoded.srcPubKey) keys.add(decoded.srcPubKey); for (const k of keys) { + if (!this._nodeHashIndex.has(k)) this._nodeHashIndex.set(k, new Set()); + if (this._nodeHashIndex.get(k).has(tx.hash)) continue; // already indexed + this._nodeHashIndex.get(k).add(tx.hash); if (!this.byNode.has(k)) this.byNode.set(k, []); - this.byNode.get(k).push(pkt); + this.byNode.get(k).push(tx); } } catch {} } + /** Remove oldest transmissions when over memory limit */ + _evict() { + while (this.packets.length > this.maxPackets) { + const old = this.packets.pop(); + this.byHash.delete(old.hash); + this.byTransmission.delete(old.hash); + // Remove observations from byId and byObserver + for (const obs of old.observations) { + this.byId.delete(obs.id); + if (obs.observer_id && this.byObserver.has(obs.observer_id)) { + const arr = this.byObserver.get(obs.observer_id).filter(o => o.id !== obs.id); + if (arr.length) this.byObserver.set(obs.observer_id, arr); else this.byObserver.delete(obs.observer_id); + } + } + // Skip node index cleanup (expensive, low value) + this.stats.evicted++; + } + } + + /** Insert a new packet (to both memory and SQLite) */ + insert(packetData) { + const id = this.dbModule.insertPacket(packetData); + const row = this.dbModule.getPacket(id); + if (row && !this.sqliteOnly) { + // Update or create transmission in memory + let tx = this.byTransmission.get(row.hash); + if (!tx) { + tx = { + id: row.id, + raw_hex: row.raw_hex, + hash: row.hash, + first_seen: row.timestamp, + timestamp: row.timestamp, + route_type: row.route_type, + payload_type: row.payload_type, + decoded_json: row.decoded_json, + observations: [], + observation_count: 0, + observer_id: row.observer_id, + observer_name: row.observer_name, + snr: row.snr, + rssi: row.rssi, + path_json: row.path_json, + direction: row.direction, + }; + this.byTransmission.set(row.hash, tx); + this.byHash.set(row.hash, tx); + this.packets.unshift(tx); // newest first + this._indexByNode(tx); + } else { + // Update first_seen if earlier + if (row.timestamp < tx.first_seen) { + tx.first_seen = row.timestamp; + tx.timestamp = row.timestamp; + } + } + + // Add observation + const obs = { + id: row.id, + observer_id: row.observer_id, + observer_name: row.observer_name, + direction: row.direction, + snr: row.snr, + rssi: row.rssi, + score: row.score, + path_json: row.path_json, + timestamp: row.timestamp, + hash: row.hash, + raw_hex: row.raw_hex, + payload_type: row.payload_type, + decoded_json: row.decoded_json, + route_type: row.route_type, + }; + tx.observations.push(obs); + tx.observation_count++; + + // Update transmission's display fields if this is first observation + if (tx.observations.length === 1) { + tx.observer_id = obs.observer_id; + tx.observer_name = obs.observer_name; + tx.snr = obs.snr; + tx.rssi = obs.rssi; + tx.path_json = obs.path_json; + } + + this.byId.set(obs.id, obs); + if (obs.observer_id) { + if (!this.byObserver.has(obs.observer_id)) this.byObserver.set(obs.observer_id, []); + this.byObserver.get(obs.observer_id).push(obs); + } + + this.stats.totalObservations++; + this._evict(); + this.stats.inserts++; + } + return id; + } + /** * Find ALL packets referencing a node — by pubkey index + name + pubkey text search. - * Single source of truth for "get packets for node X". + * Returns unique transmissions (deduped). * @param {string} nodeIdOrName - pubkey or friendly name * @param {Array} [fromPackets] - packet array to filter (defaults to this.packets) * @returns {{ packets: Array, pubkey: string, nodeName: string }} @@ -132,49 +360,24 @@ class PacketStore { if (row) { pubkey = row.public_key; nodeName = row.name || nodeIdOrName; } } catch {} - // Combine: index hits + text search by both name and pubkey + // Combine: index hits + text search const indexed = this.byNode.get(pubkey); - const idSet = indexed ? new Set(indexed.map(p => p.id)) : new Set(); + const hashSet = indexed ? new Set(indexed.map(t => t.hash)) : new Set(); const source = fromPackets || this.packets; - const packets = source.filter(p => - idSet.has(p.id) || - (p.decoded_json && (p.decoded_json.includes(nodeName) || p.decoded_json.includes(pubkey))) + const packets = source.filter(t => + hashSet.has(t.hash) || + (t.decoded_json && (t.decoded_json.includes(nodeName) || t.decoded_json.includes(pubkey))) ); return { packets, pubkey, nodeName }; } - /** Remove oldest packets when over memory limit */ - _evict() { - while (this.packets.length > this.maxPackets) { - const old = this.packets.pop(); - this.byId.delete(old.id); - // Remove from hash index - if (old.hash && this.byHash.has(old.hash)) { - const arr = this.byHash.get(old.hash).filter(p => p.id !== old.id); - if (arr.length) this.byHash.set(old.hash, arr); else this.byHash.delete(old.hash); - } - // Remove from observer index - if (old.observer_id && this.byObserver.has(old.observer_id)) { - const arr = this.byObserver.get(old.observer_id).filter(p => p.id !== old.id); - if (arr.length) this.byObserver.set(old.observer_id, arr); else this.byObserver.delete(old.observer_id); - } - // Skip node index cleanup for eviction (expensive, low value) - this.stats.evicted++; - } - } - - /** Insert a new packet (to both memory and SQLite) */ - insert(packetData) { - const id = this.dbModule.insertPacket(packetData); - const row = this.dbModule.getPacket(id); - if (row) { - this.packets.unshift(row); // newest first - this._index(row); - this._evict(); - this.stats.inserts++; - } - return id; + /** Count transmissions and observations for a node */ + countForNode(pubkey) { + const txs = this.byNode.get(pubkey) || []; + let observations = 0; + for (const tx of txs) observations += tx.observation_count; + return { transmissions: txs.length, observations }; } /** Query packets with filters — all from memory (or SQLite in fallback mode) */ @@ -187,9 +390,11 @@ class PacketStore { // Use indexes for single-key filters when possible if (hash && !type && !route && !region && !observer && !since && !until && !node) { - results = this.byHash.get(hash) || []; + const tx = this.byHash.get(hash); + results = tx ? [tx] : []; } else if (observer && !type && !route && !region && !hash && !since && !until && !node) { - results = this.byObserver.get(observer) || []; + // For observer filter, find unique transmissions where any observation matches + results = this._transmissionsForObserver(observer); } else if (node && !type && !route && !region && !observer && !hash && !since && !until) { results = this.findPacketsForNode(node).packets; } else { @@ -202,18 +407,22 @@ class PacketStore { const r = Number(route); results = results.filter(p => p.route_type === r); } - if (observer) results = results.filter(p => p.observer_id === observer); - if (hash) results = results.filter(p => p.hash === hash); + if (observer) results = this._transmissionsForObserver(observer, results); + if (hash) { + const tx = this.byHash.get(hash); + results = tx ? results.filter(p => p.hash === hash) : []; + } if (since) results = results.filter(p => p.timestamp > since); if (until) results = results.filter(p => p.timestamp < until); if (region) { - // Need to look up observers for this region const regionObservers = new Set(); try { const obs = this.db.prepare('SELECT id FROM observers WHERE iata = ?').all(region); obs.forEach(o => regionObservers.add(o.id)); } catch {} - results = results.filter(p => regionObservers.has(p.observer_id)); + results = results.filter(p => + p.observations.some(o => regionObservers.has(o.observer_id)) + ); } if (node) { results = this.findPacketsForNode(node, results).packets; @@ -237,52 +446,52 @@ class PacketStore { return { packets: paginated, total }; } - /** Query with groupByHash — aggregate packets by content hash */ + /** Find unique transmissions that have at least one observation from given observer */ + _transmissionsForObserver(observerId, fromTransmissions) { + if (fromTransmissions) { + return fromTransmissions.filter(tx => + tx.observations.some(o => o.observer_id === observerId) + ); + } + // Use byObserver index: get observations, then unique transmissions + const obs = this.byObserver.get(observerId) || []; + const seen = new Set(); + const result = []; + for (const o of obs) { + if (!seen.has(o.hash)) { + seen.add(o.hash); + const tx = this.byTransmission.get(o.hash); + if (tx) result.push(tx); + } + } + return result; + } + + /** Query with groupByHash — now trivial since packets ARE transmissions */ queryGrouped({ limit = 50, offset = 0, type, route, region, observer, hash, since, until, node } = {}) { this.stats.queries++; if (this.sqliteOnly) return this._queryGroupedSQLite({ limit, offset, type, route, region, observer, hash, since, until, node }); - // Get filtered results first + // Get filtered transmissions const { packets: filtered, total: filteredTotal } = this.query({ limit: 999999, offset: 0, type, route, region, observer, hash, since, until, node }); - // Group by hash - const groups = new Map(); - for (const p of filtered) { - const h = p.hash || p.id.toString(); - if (!groups.has(h)) { - groups.set(h, { - hash: p.hash, - observer_count: new Set(), - count: 0, - latest: p.timestamp, - observer_id: p.observer_id, - observer_name: p.observer_name, - path_json: p.path_json, - payload_type: p.payload_type, - raw_hex: p.raw_hex, - decoded_json: p.decoded_json, - }); - } - const g = groups.get(h); - g.count++; - if (p.observer_id) g.observer_count.add(p.observer_id); - if (p.timestamp > g.latest) { - g.latest = p.timestamp; - } - // Keep longest path - if (p.path_json && (!g.path_json || p.path_json.length > g.path_json.length)) { - g.path_json = p.path_json; - g.raw_hex = p.raw_hex; - } - } - - // Sort by latest DESC, paginate - const sorted = [...groups.values()] - .map(g => ({ ...g, observer_count: g.observer_count.size })) - .sort((a, b) => b.latest.localeCompare(a.latest)); + // Already grouped by hash — just format for backward compat + const sorted = filtered.map(tx => ({ + hash: tx.hash, + count: tx.observation_count, + observer_count: new Set(tx.observations.map(o => o.observer_id).filter(Boolean)).size, + latest: tx.observations.length ? tx.observations.reduce((max, o) => o.timestamp > max ? o.timestamp : max, tx.observations[0].timestamp) : tx.timestamp, + observer_id: tx.observer_id, + observer_name: tx.observer_name, + path_json: tx.path_json, + payload_type: tx.payload_type, + raw_hex: tx.raw_hex, + decoded_json: tx.decoded_json, + observation_count: tx.observation_count, + })).sort((a, b) => b.latest.localeCompare(a.latest)); const total = sorted.length; const paginated = sorted.slice(Number(offset), Number(offset) + Number(limit)); @@ -302,25 +511,26 @@ class PacketStore { return results.reverse(); } - /** Get a single packet by ID */ + /** Get a single packet by ID — checks observation IDs first (backward compat) */ getById(id) { if (this.sqliteOnly) return this.db.prepare('SELECT * FROM packets WHERE id = ?').get(id) || null; return this.byId.get(id) || null; } - /** Get all siblings of a packet (same hash) */ + /** Get all siblings of a packet (same hash) — returns observations array */ getSiblings(hash) { if (this.sqliteOnly) return this.db.prepare('SELECT * FROM packets WHERE hash = ? ORDER BY timestamp DESC').all(hash); - return this.byHash.get(hash) || []; + const tx = this.byTransmission.get(hash); + return tx ? tx.observations : []; } - /** Get all packets (raw array reference — do not mutate) */ + /** Get all transmissions (backward compat — returns packets array) */ all() { if (this.sqliteOnly) return this.db.prepare('SELECT * FROM packets ORDER BY timestamp DESC').all(); return this.packets; } - /** Get all packets matching a filter function */ + /** Get all transmissions matching a filter function */ filter(fn) { if (this.sqliteOnly) return this.db.prepare('SELECT * FROM packets ORDER BY timestamp DESC').all().filter(fn); return this.packets.filter(fn);