mirror of
https://github.com/Kpa-clawbot/meshcore-analyzer.git
synced 2026-03-29 10:50:17 +00:00
- Add hash field back to observation objects in packet-store.js (both _loadNormalized and insert paths) — it's only 16 chars, negligible memory vs the big fields raw_hex + decoded_json - Fix /api/analytics/signal: look up raw_hex from transmission via byTxId for packet size calculation - Fix /api/observers/:id/analytics: enrich obsPackets so payload_type and decoded_json are available for type breakdown and node buckets - Endpoints /api/nodes/bulk-health, /api/nodes/network-status, and /api/analytics/subpaths now work because observations carry hash All 625 tests pass (unit + integration). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
753 lines
29 KiB
JavaScript
753 lines
29 KiB
JavaScript
'use strict';
|
|
|
|
/**
|
|
* In-memory packet store — loads transmissions + observations from SQLite on startup,
|
|
* serves reads from RAM, writes to both RAM + SQLite.
|
|
* M3: Restructured around transmissions (deduped by hash) with observations.
|
|
* Caps memory at configurable limit (default 1GB).
|
|
*/
|
|
class PacketStore {
|
|
constructor(dbModule, config = {}) {
|
|
this.dbModule = dbModule; // The full db module (has .db, .insertTransmission, .getPacket)
|
|
this.db = dbModule.db; // Raw better-sqlite3 instance for queries
|
|
this.maxBytes = (config.maxMemoryMB || 1024) * 1024 * 1024;
|
|
this.estPacketBytes = config.estimatedPacketBytes || 450;
|
|
this.maxPackets = Math.floor(this.maxBytes / this.estPacketBytes);
|
|
|
|
// SQLite-only mode: skip RAM loading, all reads go to DB
|
|
this.sqliteOnly = process.env.NO_MEMORY_STORE === '1';
|
|
|
|
// Primary storage: transmissions sorted by first_seen DESC (newest first)
|
|
// Each transmission looks like a packet for backward compat
|
|
this.packets = [];
|
|
|
|
// Indexes
|
|
this.byId = new Map(); // observation_id → observation object (backward compat for packet detail links)
|
|
this.byTxId = new Map(); // transmission_id → transmission object
|
|
this.byHash = new Map(); // hash → transmission object (1:1)
|
|
this.byObserver = new Map(); // observer_id → [observation objects]
|
|
this.byNode = new Map(); // pubkey → [transmission objects] (deduped)
|
|
|
|
// Track which hashes are indexed per node pubkey (avoid dupes in byNode)
|
|
this._nodeHashIndex = new Map(); // pubkey → Set<hash>
|
|
this._advertByObserver = new Map(); // pubkey → Set<observer_id> (ADVERT-only, for region filtering)
|
|
|
|
this.loaded = false;
|
|
this.stats = { totalLoaded: 0, totalObservations: 0, evicted: 0, inserts: 0, queries: 0 };
|
|
}
|
|
|
|
/** Load all packets from SQLite into memory */
|
|
load() {
|
|
if (this.sqliteOnly) {
|
|
console.log('[PacketStore] SQLite-only mode (NO_MEMORY_STORE=1) — all reads go to database');
|
|
this.loaded = true;
|
|
return this;
|
|
}
|
|
|
|
const t0 = Date.now();
|
|
|
|
// Check if normalized schema exists
|
|
const hasTransmissions = this.db.prepare(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='transmissions'"
|
|
).get();
|
|
|
|
if (hasTransmissions) {
|
|
this._loadNormalized();
|
|
} else {
|
|
this._loadLegacy();
|
|
}
|
|
|
|
this.stats.totalLoaded = this.packets.length;
|
|
this.loaded = true;
|
|
const elapsed = Date.now() - t0;
|
|
console.log(`[PacketStore] Loaded ${this.packets.length} transmissions (${this.stats.totalObservations} observations) in ${elapsed}ms (${Math.round(this.packets.length * this.estPacketBytes / 1024 / 1024)}MB est)`);
|
|
return this;
|
|
}
|
|
|
|
/** Load from normalized transmissions + observations tables */
|
|
_loadNormalized() {
|
|
// Detect v3 schema (observer_idx instead of observer_id in observations)
|
|
const obsCols = this.db.pragma('table_info(observations)').map(c => c.name);
|
|
const isV3 = obsCols.includes('observer_idx');
|
|
|
|
const sql = isV3
|
|
? `SELECT t.id AS transmission_id, t.raw_hex, t.hash, t.first_seen, t.route_type,
|
|
t.payload_type, t.payload_version, t.decoded_json,
|
|
o.id AS observation_id, obs.id AS observer_id, obs.name AS observer_name, o.direction,
|
|
o.snr, o.rssi, o.score, o.path_json, datetime(o.timestamp, 'unixepoch') AS obs_timestamp
|
|
FROM transmissions t
|
|
LEFT JOIN observations o ON o.transmission_id = t.id
|
|
LEFT JOIN observers obs ON obs.rowid = o.observer_idx
|
|
ORDER BY t.first_seen DESC, o.timestamp DESC`
|
|
: `SELECT t.id AS transmission_id, t.raw_hex, t.hash, t.first_seen, t.route_type,
|
|
t.payload_type, t.payload_version, t.decoded_json,
|
|
o.id AS observation_id, o.observer_id, o.observer_name, o.direction,
|
|
o.snr, o.rssi, o.score, o.path_json, o.timestamp AS obs_timestamp
|
|
FROM transmissions t
|
|
LEFT JOIN observations o ON o.transmission_id = t.id
|
|
ORDER BY t.first_seen DESC, o.timestamp DESC`;
|
|
|
|
for (const row of this.db.prepare(sql).iterate()) {
|
|
if (this.packets.length >= this.maxPackets && !this.byHash.has(row.hash)) break;
|
|
|
|
let tx = this.byHash.get(row.hash);
|
|
if (!tx) {
|
|
tx = {
|
|
id: row.transmission_id,
|
|
raw_hex: row.raw_hex,
|
|
hash: row.hash,
|
|
first_seen: row.first_seen,
|
|
timestamp: row.first_seen,
|
|
route_type: row.route_type,
|
|
payload_type: row.payload_type,
|
|
decoded_json: row.decoded_json,
|
|
observations: [],
|
|
observation_count: 0,
|
|
// Filled from first observation for backward compat
|
|
observer_id: null,
|
|
observer_name: null,
|
|
snr: null,
|
|
rssi: null,
|
|
path_json: null,
|
|
direction: null,
|
|
};
|
|
this.byHash.set(row.hash, tx);
|
|
this.byHash.set(row.hash, tx);
|
|
this.packets.push(tx);
|
|
this.byTxId.set(tx.id, tx);
|
|
this._indexByNode(tx);
|
|
}
|
|
|
|
if (row.observation_id != null) {
|
|
const obs = {
|
|
id: row.observation_id,
|
|
transmission_id: tx.id,
|
|
hash: tx.hash,
|
|
observer_id: row.observer_id,
|
|
observer_name: row.observer_name,
|
|
direction: row.direction,
|
|
snr: row.snr,
|
|
rssi: row.rssi,
|
|
score: row.score,
|
|
path_json: row.path_json,
|
|
timestamp: row.obs_timestamp,
|
|
};
|
|
|
|
// Dedup: skip if same observer + same path already loaded
|
|
const isDupeLoad = tx.observations.some(o => o.observer_id === obs.observer_id && (o.path_json || '') === (obs.path_json || ''));
|
|
if (isDupeLoad) continue;
|
|
|
|
tx.observations.push(obs);
|
|
tx.observation_count++;
|
|
|
|
// Fill first observation data into transmission for backward compat
|
|
if (tx.observer_id == null && obs.observer_id) {
|
|
tx.observer_id = obs.observer_id;
|
|
tx.observer_name = obs.observer_name;
|
|
tx.snr = obs.snr;
|
|
tx.rssi = obs.rssi;
|
|
tx.path_json = obs.path_json;
|
|
tx.direction = obs.direction;
|
|
}
|
|
|
|
// byId maps observation IDs for packet detail links
|
|
this.byId.set(obs.id, obs);
|
|
|
|
// byObserver
|
|
if (obs.observer_id) {
|
|
if (!this.byObserver.has(obs.observer_id)) this.byObserver.set(obs.observer_id, []);
|
|
this.byObserver.get(obs.observer_id).push(obs);
|
|
}
|
|
|
|
this.stats.totalObservations++;
|
|
}
|
|
}
|
|
|
|
// Post-load: set each transmission's display path to the LONGEST observation path
|
|
// (most representative of mesh topology — short paths are just nearby observers)
|
|
for (const tx of this.packets) {
|
|
if (tx.observations.length > 0) {
|
|
let best = tx.observations[0];
|
|
let bestLen = 0;
|
|
try { bestLen = JSON.parse(best.path_json || '[]').length; } catch {}
|
|
for (let i = 1; i < tx.observations.length; i++) {
|
|
let len = 0;
|
|
try { len = JSON.parse(tx.observations[i].path_json || '[]').length; } catch {}
|
|
if (len > bestLen) { best = tx.observations[i]; bestLen = len; }
|
|
}
|
|
tx.observer_id = best.observer_id;
|
|
tx.observer_name = best.observer_name;
|
|
tx.snr = best.snr;
|
|
tx.rssi = best.rssi;
|
|
tx.path_json = best.path_json;
|
|
tx.direction = best.direction;
|
|
}
|
|
}
|
|
|
|
// Post-load: build ADVERT-by-observer index (needs all observations loaded first)
|
|
for (const tx of this.packets) {
|
|
if (tx.payload_type === 4 && tx.decoded_json) {
|
|
try {
|
|
const d = JSON.parse(tx.decoded_json);
|
|
if (d.pubKey) this._indexAdvertObservers(d.pubKey, tx);
|
|
} catch {}
|
|
}
|
|
}
|
|
console.log(`[PacketStore] ADVERT observer index: ${this._advertByObserver.size} nodes tracked`);
|
|
}
|
|
|
|
/** Fallback: load from legacy packets table */
|
|
_loadLegacy() {
|
|
for (const row of this.db.prepare(
|
|
'SELECT * FROM packets_v ORDER BY timestamp DESC'
|
|
).iterate()) {
|
|
if (this.packets.length >= this.maxPackets) break;
|
|
this._indexLegacy(row);
|
|
}
|
|
}
|
|
|
|
/** Index a legacy packet row (old flat structure) — builds transmission + observation */
|
|
_indexLegacy(pkt) {
|
|
let tx = this.byHash.get(pkt.hash);
|
|
if (!tx) {
|
|
tx = {
|
|
id: pkt.id,
|
|
raw_hex: pkt.raw_hex,
|
|
hash: pkt.hash,
|
|
first_seen: pkt.timestamp,
|
|
timestamp: pkt.timestamp,
|
|
route_type: pkt.route_type,
|
|
payload_type: pkt.payload_type,
|
|
decoded_json: pkt.decoded_json,
|
|
observations: [],
|
|
observation_count: 0,
|
|
observer_id: pkt.observer_id,
|
|
observer_name: pkt.observer_name,
|
|
snr: pkt.snr,
|
|
rssi: pkt.rssi,
|
|
path_json: pkt.path_json,
|
|
direction: pkt.direction,
|
|
};
|
|
this.byHash.set(pkt.hash, tx);
|
|
this.byHash.set(pkt.hash, tx);
|
|
this.packets.push(tx);
|
|
this.byTxId.set(tx.id, tx);
|
|
this._indexByNode(tx);
|
|
}
|
|
|
|
if (pkt.timestamp < tx.first_seen) {
|
|
tx.first_seen = pkt.timestamp;
|
|
tx.timestamp = pkt.timestamp;
|
|
}
|
|
// Update display path if new observation has longer path
|
|
let newPathLen = 0, curPathLen = 0;
|
|
try { newPathLen = JSON.parse(pkt.path_json || '[]').length; } catch {}
|
|
try { curPathLen = JSON.parse(tx.path_json || '[]').length; } catch {}
|
|
if (newPathLen > curPathLen) {
|
|
tx.observer_id = pkt.observer_id;
|
|
tx.observer_name = pkt.observer_name;
|
|
tx.path_json = pkt.path_json;
|
|
}
|
|
|
|
const obs = {
|
|
id: pkt.id,
|
|
transmission_id: tx.id,
|
|
observer_id: pkt.observer_id,
|
|
observer_name: pkt.observer_name,
|
|
direction: pkt.direction,
|
|
snr: pkt.snr,
|
|
rssi: pkt.rssi,
|
|
score: pkt.score,
|
|
path_json: pkt.path_json,
|
|
timestamp: pkt.timestamp,
|
|
};
|
|
// Dedup: skip if same observer + same path already recorded for this transmission
|
|
const isDupe = tx.observations.some(o => o.observer_id === obs.observer_id && (o.path_json || '') === (obs.path_json || ''));
|
|
if (isDupe) return tx;
|
|
|
|
tx.observations.push(obs);
|
|
tx.observation_count++;
|
|
|
|
this.byId.set(pkt.id, obs);
|
|
|
|
if (pkt.observer_id) {
|
|
if (!this.byObserver.has(pkt.observer_id)) this.byObserver.set(pkt.observer_id, []);
|
|
this.byObserver.get(pkt.observer_id).push(obs);
|
|
}
|
|
|
|
this.stats.totalObservations++;
|
|
}
|
|
|
|
/** Extract node pubkeys from decoded_json and index transmission in byNode */
|
|
_indexByNode(tx) {
|
|
if (!tx.decoded_json) return;
|
|
try {
|
|
const decoded = JSON.parse(tx.decoded_json);
|
|
const keys = new Set();
|
|
if (decoded.pubKey) keys.add(decoded.pubKey);
|
|
if (decoded.destPubKey) keys.add(decoded.destPubKey);
|
|
if (decoded.srcPubKey) keys.add(decoded.srcPubKey);
|
|
for (const k of keys) {
|
|
if (!this._nodeHashIndex.has(k)) this._nodeHashIndex.set(k, new Set());
|
|
if (this._nodeHashIndex.get(k).has(tx.hash)) continue;
|
|
this._nodeHashIndex.get(k).add(tx.hash);
|
|
if (!this.byNode.has(k)) this.byNode.set(k, []);
|
|
this.byNode.get(k).push(tx);
|
|
}
|
|
} catch {}
|
|
}
|
|
|
|
/** Track which observers saw an ADVERT from a given pubkey */
|
|
_indexAdvertObservers(pubkey, tx) {
|
|
if (!this._advertByObserver.has(pubkey)) this._advertByObserver.set(pubkey, new Set());
|
|
const s = this._advertByObserver.get(pubkey);
|
|
for (const obs of tx.observations) {
|
|
if (obs.observer_id) s.add(obs.observer_id);
|
|
}
|
|
}
|
|
|
|
/** Get node pubkeys whose ADVERTs were seen by any of the given observer IDs */
|
|
getNodesByAdvertObservers(observerIds) {
|
|
const result = new Set();
|
|
for (const [pubkey, observers] of this._advertByObserver) {
|
|
for (const obsId of observerIds) {
|
|
if (observers.has(obsId)) { result.add(pubkey); break; }
|
|
}
|
|
}
|
|
return result;
|
|
}
|
|
|
|
/** Remove oldest transmissions when over memory limit */
|
|
_evict() {
|
|
while (this.packets.length > this.maxPackets) {
|
|
const old = this.packets.pop();
|
|
this.byHash.delete(old.hash);
|
|
this.byHash.delete(old.hash);
|
|
this.byTxId.delete(old.id);
|
|
// Remove observations from byId and byObserver
|
|
for (const obs of old.observations) {
|
|
this.byId.delete(obs.id);
|
|
if (obs.observer_id && this.byObserver.has(obs.observer_id)) {
|
|
const arr = this.byObserver.get(obs.observer_id).filter(o => o.id !== obs.id);
|
|
if (arr.length) this.byObserver.set(obs.observer_id, arr); else this.byObserver.delete(obs.observer_id);
|
|
}
|
|
}
|
|
// Skip node index cleanup (expensive, low value)
|
|
this.stats.evicted++;
|
|
}
|
|
}
|
|
|
|
/** Insert a new packet (to both memory and SQLite) */
|
|
insert(packetData) {
|
|
// Write to normalized tables and get the transmission ID
|
|
const txResult = this.dbModule.insertTransmission ? this.dbModule.insertTransmission(packetData) : null;
|
|
const transmissionId = txResult ? txResult.transmissionId : null;
|
|
const observationId = txResult ? txResult.observationId : null;
|
|
|
|
// Build row directly from packetData — avoids view ID mismatch issues
|
|
const row = {
|
|
id: observationId,
|
|
raw_hex: packetData.raw_hex,
|
|
hash: packetData.hash,
|
|
timestamp: packetData.timestamp,
|
|
route_type: packetData.route_type,
|
|
payload_type: packetData.payload_type,
|
|
payload_version: packetData.payload_version,
|
|
decoded_json: packetData.decoded_json,
|
|
observer_id: packetData.observer_id,
|
|
observer_name: packetData.observer_name,
|
|
snr: packetData.snr,
|
|
rssi: packetData.rssi,
|
|
path_json: packetData.path_json,
|
|
direction: packetData.direction,
|
|
};
|
|
if (!this.sqliteOnly) {
|
|
// Update or create transmission in memory
|
|
let tx = this.byHash.get(row.hash);
|
|
if (!tx) {
|
|
tx = {
|
|
id: transmissionId || row.id,
|
|
raw_hex: row.raw_hex,
|
|
hash: row.hash,
|
|
first_seen: row.timestamp,
|
|
timestamp: row.timestamp,
|
|
route_type: row.route_type,
|
|
payload_type: row.payload_type,
|
|
decoded_json: row.decoded_json,
|
|
observations: [],
|
|
observation_count: 0,
|
|
observer_id: row.observer_id,
|
|
observer_name: row.observer_name,
|
|
snr: row.snr,
|
|
rssi: row.rssi,
|
|
path_json: row.path_json,
|
|
direction: row.direction,
|
|
};
|
|
this.byHash.set(row.hash, tx);
|
|
this.byHash.set(row.hash, tx);
|
|
this.packets.unshift(tx); // newest first
|
|
this.byTxId.set(tx.id, tx);
|
|
this._indexByNode(tx);
|
|
} else {
|
|
// Update first_seen if earlier
|
|
if (row.timestamp < tx.first_seen) {
|
|
tx.first_seen = row.timestamp;
|
|
tx.timestamp = row.timestamp;
|
|
}
|
|
// Update display path if new observation has longer path
|
|
let newPathLen = 0, curPathLen = 0;
|
|
try { newPathLen = JSON.parse(row.path_json || '[]').length; } catch {}
|
|
try { curPathLen = JSON.parse(tx.path_json || '[]').length; } catch {}
|
|
if (newPathLen > curPathLen) {
|
|
tx.observer_id = row.observer_id;
|
|
tx.observer_name = row.observer_name;
|
|
tx.path_json = row.path_json;
|
|
}
|
|
}
|
|
|
|
// Add observation
|
|
const obs = {
|
|
id: row.id,
|
|
transmission_id: tx.id,
|
|
hash: tx.hash,
|
|
observer_id: row.observer_id,
|
|
observer_name: row.observer_name,
|
|
direction: row.direction,
|
|
snr: row.snr,
|
|
rssi: row.rssi,
|
|
score: row.score,
|
|
path_json: row.path_json,
|
|
timestamp: row.timestamp,
|
|
};
|
|
// Dedup: skip if same observer + same path already recorded for this transmission
|
|
const isDupe = tx.observations.some(o => o.observer_id === obs.observer_id && (o.path_json || '') === (obs.path_json || ''));
|
|
if (!isDupe) {
|
|
tx.observations.push(obs);
|
|
tx.observation_count++;
|
|
}
|
|
|
|
// Update transmission's display fields if this is first observation
|
|
if (tx.observations.length === 1) {
|
|
tx.observer_id = obs.observer_id;
|
|
tx.observer_name = obs.observer_name;
|
|
tx.snr = obs.snr;
|
|
tx.rssi = obs.rssi;
|
|
tx.path_json = obs.path_json;
|
|
}
|
|
|
|
this.byId.set(obs.id, obs);
|
|
if (obs.observer_id) {
|
|
if (!this.byObserver.has(obs.observer_id)) this.byObserver.set(obs.observer_id, []);
|
|
this.byObserver.get(obs.observer_id).push(obs);
|
|
}
|
|
|
|
this.stats.totalObservations++;
|
|
|
|
// Update ADVERT observer index for live ingestion
|
|
if (tx.payload_type === 4 && obs.observer_id && tx.decoded_json) {
|
|
try {
|
|
const d = JSON.parse(tx.decoded_json);
|
|
if (d.pubKey) {
|
|
if (!this._advertByObserver.has(d.pubKey)) this._advertByObserver.set(d.pubKey, new Set());
|
|
this._advertByObserver.get(d.pubKey).add(obs.observer_id);
|
|
}
|
|
} catch {}
|
|
}
|
|
|
|
this._evict();
|
|
this.stats.inserts++;
|
|
}
|
|
return observationId || transmissionId;
|
|
}
|
|
|
|
/**
|
|
* Find ALL packets referencing a node — by pubkey index + name + pubkey text search.
|
|
* Returns unique transmissions (deduped).
|
|
* @param {string} nodeIdOrName - pubkey or friendly name
|
|
* @param {Array} [fromPackets] - packet array to filter (defaults to this.packets)
|
|
* @returns {{ packets: Array, pubkey: string, nodeName: string }}
|
|
*/
|
|
findPacketsForNode(nodeIdOrName, fromPackets) {
|
|
let pubkey = nodeIdOrName;
|
|
let nodeName = nodeIdOrName;
|
|
|
|
// Always resolve to get both pubkey and name
|
|
try {
|
|
const row = this.db.prepare("SELECT public_key, name FROM nodes WHERE public_key = ? OR name = ? LIMIT 1").get(nodeIdOrName, nodeIdOrName);
|
|
if (row) { pubkey = row.public_key; nodeName = row.name || nodeIdOrName; }
|
|
} catch {}
|
|
|
|
// Combine: index hits + text search
|
|
const indexed = this.byNode.get(pubkey);
|
|
const hashSet = indexed ? new Set(indexed.map(t => t.hash)) : new Set();
|
|
const source = fromPackets || this.packets;
|
|
const packets = source.filter(t =>
|
|
hashSet.has(t.hash) ||
|
|
(t.decoded_json && (t.decoded_json.includes(nodeName) || t.decoded_json.includes(pubkey)))
|
|
);
|
|
|
|
return { packets, pubkey, nodeName };
|
|
}
|
|
|
|
/** Count transmissions and observations for a node */
|
|
countForNode(pubkey) {
|
|
const txs = this.byNode.get(pubkey) || [];
|
|
let observations = 0;
|
|
for (const tx of txs) observations += tx.observation_count;
|
|
return { transmissions: txs.length, observations };
|
|
}
|
|
|
|
/** Query packets with filters — all from memory (or SQLite in fallback mode) */
|
|
query({ limit = 50, offset = 0, type, route, region, observer, hash, since, until, node, order = 'DESC' } = {}) {
|
|
this.stats.queries++;
|
|
|
|
if (this.sqliteOnly) return this._querySQLite({ limit, offset, type, route, region, observer, hash, since, until, node, order });
|
|
|
|
let results = this.packets;
|
|
|
|
// Use indexes for single-key filters when possible
|
|
if (hash && !type && !route && !region && !observer && !since && !until && !node) {
|
|
const tx = this.byHash.get(hash);
|
|
results = tx ? [tx] : [];
|
|
} else if (observer && !type && !route && !region && !hash && !since && !until && !node) {
|
|
// For observer filter, find unique transmissions where any observation matches
|
|
results = this._transmissionsForObserver(observer);
|
|
} else if (node && !type && !route && !region && !observer && !hash && !since && !until) {
|
|
results = this.findPacketsForNode(node).packets;
|
|
} else {
|
|
// Apply filters sequentially
|
|
if (type !== undefined) {
|
|
const t = Number(type);
|
|
results = results.filter(p => p.payload_type === t);
|
|
}
|
|
if (route !== undefined) {
|
|
const r = Number(route);
|
|
results = results.filter(p => p.route_type === r);
|
|
}
|
|
if (observer) results = this._transmissionsForObserver(observer, results);
|
|
if (hash) {
|
|
const h = hash.toLowerCase();
|
|
const tx = this.byHash.get(h);
|
|
results = tx ? results.filter(p => p.hash === h) : [];
|
|
}
|
|
if (since) results = results.filter(p => p.timestamp > since);
|
|
if (until) results = results.filter(p => p.timestamp < until);
|
|
if (region) {
|
|
const regionObservers = new Set();
|
|
try {
|
|
const obs = this.db.prepare('SELECT id FROM observers WHERE iata = ?').all(region);
|
|
obs.forEach(o => regionObservers.add(o.id));
|
|
} catch {}
|
|
results = results.filter(p =>
|
|
p.observations.some(o => regionObservers.has(o.observer_id))
|
|
);
|
|
}
|
|
if (node) {
|
|
results = this.findPacketsForNode(node, results).packets;
|
|
}
|
|
}
|
|
|
|
const total = results.length;
|
|
|
|
// Sort
|
|
if (order === 'ASC') {
|
|
results = results.slice().sort((a, b) => {
|
|
if (a.timestamp < b.timestamp) return -1;
|
|
if (a.timestamp > b.timestamp) return 1;
|
|
return 0;
|
|
});
|
|
}
|
|
// Default DESC — packets array is already sorted newest-first
|
|
|
|
// Paginate
|
|
const paginated = results.slice(Number(offset), Number(offset) + Number(limit));
|
|
return { packets: paginated, total };
|
|
}
|
|
|
|
/** Find unique transmissions that have at least one observation from given observer */
|
|
_transmissionsForObserver(observerId, fromTransmissions) {
|
|
if (fromTransmissions) {
|
|
return fromTransmissions.filter(tx =>
|
|
tx.observations.some(o => o.observer_id === observerId)
|
|
);
|
|
}
|
|
// Use byObserver index: get observations, then unique transmissions
|
|
const obs = this.byObserver.get(observerId) || [];
|
|
const seen = new Set();
|
|
const result = [];
|
|
for (const o of obs) {
|
|
const txId = o.transmission_id;
|
|
if (!seen.has(txId)) {
|
|
seen.add(txId);
|
|
const tx = this.byTxId.get(txId);
|
|
if (tx) result.push(tx);
|
|
}
|
|
}
|
|
return result;
|
|
}
|
|
|
|
/** Query with groupByHash — now trivial since packets ARE transmissions */
|
|
queryGrouped({ limit = 50, offset = 0, type, route, region, observer, hash, since, until, node } = {}) {
|
|
this.stats.queries++;
|
|
|
|
if (this.sqliteOnly) return this._queryGroupedSQLite({ limit, offset, type, route, region, observer, hash, since, until, node });
|
|
|
|
// Get filtered transmissions
|
|
const { packets: filtered, total: filteredTotal } = this.query({
|
|
limit: 999999, offset: 0, type, route, region, observer, hash, since, until, node
|
|
});
|
|
|
|
// Already grouped by hash — just format for backward compat
|
|
const sorted = filtered.map(tx => ({
|
|
hash: tx.hash,
|
|
first_seen: tx.first_seen || tx.timestamp,
|
|
count: tx.observation_count,
|
|
observer_count: new Set(tx.observations.map(o => o.observer_id).filter(Boolean)).size,
|
|
latest: tx.observations.length ? tx.observations.reduce((max, o) => o.timestamp > max ? o.timestamp : max, tx.observations[0].timestamp) : tx.timestamp,
|
|
observer_id: tx.observer_id,
|
|
observer_name: tx.observer_name,
|
|
path_json: tx.path_json,
|
|
payload_type: tx.payload_type,
|
|
route_type: tx.route_type,
|
|
raw_hex: tx.raw_hex,
|
|
decoded_json: tx.decoded_json,
|
|
observation_count: tx.observation_count,
|
|
snr: tx.snr,
|
|
rssi: tx.rssi,
|
|
})).sort((a, b) => b.latest.localeCompare(a.latest));
|
|
|
|
const total = sorted.length;
|
|
const paginated = sorted.slice(Number(offset), Number(offset) + Number(limit));
|
|
return { packets: paginated, total };
|
|
}
|
|
|
|
/** Get timestamps for sparkline */
|
|
getTimestamps(since) {
|
|
if (this.sqliteOnly) {
|
|
return this.db.prepare('SELECT timestamp FROM packets_v WHERE timestamp > ? ORDER BY timestamp ASC').all(since).map(r => r.timestamp);
|
|
}
|
|
const results = [];
|
|
for (const p of this.packets) {
|
|
if (p.timestamp <= since) break;
|
|
results.push(p.timestamp);
|
|
}
|
|
return results.reverse();
|
|
}
|
|
|
|
/** Get a single packet by ID — checks observation IDs first (backward compat) */
|
|
getById(id) {
|
|
if (this.sqliteOnly) return this.db.prepare('SELECT * FROM packets_v WHERE id = ?').get(id) || null;
|
|
const obs = this.byId.get(id) || null;
|
|
return this._enrichObs(obs);
|
|
}
|
|
|
|
/** Get a transmission by its transmission table ID */
|
|
getByTxId(id) {
|
|
if (this.sqliteOnly) return this.db.prepare('SELECT * FROM transmissions WHERE id = ?').get(id) || null;
|
|
return this.byTxId.get(id) || null;
|
|
}
|
|
|
|
/** Get all siblings of a packet (same hash) — returns enriched observations array */
|
|
getSiblings(hash) {
|
|
const h = hash.toLowerCase();
|
|
if (this.sqliteOnly) return this.db.prepare('SELECT * FROM packets_v WHERE hash = ? ORDER BY timestamp DESC').all(h);
|
|
const tx = this.byHash.get(h);
|
|
return tx ? tx.observations.map(o => this._enrichObs(o)) : [];
|
|
}
|
|
|
|
/** Get all transmissions (backward compat — returns packets array) */
|
|
all() {
|
|
if (this.sqliteOnly) return this.db.prepare('SELECT * FROM packets_v ORDER BY timestamp DESC').all();
|
|
return this.packets;
|
|
}
|
|
|
|
/** Get all transmissions matching a filter function */
|
|
filter(fn) {
|
|
if (this.sqliteOnly) return this.db.prepare('SELECT * FROM packets_v ORDER BY timestamp DESC').all().filter(fn);
|
|
return this.packets.filter(fn);
|
|
}
|
|
|
|
/** Enrich a lean observation with transmission fields (for API responses) */
|
|
_enrichObs(obs) {
|
|
if (!obs) return null;
|
|
const tx = this.byTxId.get(obs.transmission_id);
|
|
if (!tx) return obs;
|
|
return {
|
|
...obs,
|
|
hash: tx.hash,
|
|
raw_hex: tx.raw_hex,
|
|
payload_type: tx.payload_type,
|
|
decoded_json: tx.decoded_json,
|
|
route_type: tx.route_type,
|
|
};
|
|
}
|
|
|
|
/** Enrich an array of observations with transmission fields */
|
|
enrichObservations(observations) {
|
|
if (!observations || !observations.length) return observations;
|
|
return observations.map(o => this._enrichObs(o));
|
|
}
|
|
|
|
/** Memory stats */
|
|
getStats() {
|
|
return {
|
|
...this.stats,
|
|
inMemory: this.sqliteOnly ? 0 : this.packets.length,
|
|
sqliteOnly: this.sqliteOnly,
|
|
maxPackets: this.maxPackets,
|
|
estimatedMB: this.sqliteOnly ? 0 : Math.round(this.packets.length * this.estPacketBytes / 1024 / 1024),
|
|
maxMB: Math.round(this.maxBytes / 1024 / 1024),
|
|
indexes: {
|
|
byHash: this.byHash.size,
|
|
byObserver: this.byObserver.size,
|
|
byNode: this.byNode.size,
|
|
advertByObserver: this._advertByObserver.size,
|
|
}
|
|
};
|
|
}
|
|
|
|
/** SQLite fallback: query with filters */
|
|
_querySQLite({ limit, offset, type, route, region, observer, hash, since, until, node, order }) {
|
|
const where = []; const params = [];
|
|
if (type !== undefined) { where.push('payload_type = ?'); params.push(Number(type)); }
|
|
if (route !== undefined) { where.push('route_type = ?'); params.push(Number(route)); }
|
|
if (observer) { where.push('observer_id = ?'); params.push(observer); }
|
|
if (hash) { where.push('hash = ?'); params.push(hash.toLowerCase()); }
|
|
if (since) { where.push('timestamp > ?'); params.push(since); }
|
|
if (until) { where.push('timestamp < ?'); params.push(until); }
|
|
if (region) { where.push('observer_id IN (SELECT id FROM observers WHERE iata = ?)'); params.push(region); }
|
|
if (node) { try { const nr = this.db.prepare('SELECT public_key FROM nodes WHERE public_key = ? OR name = ? LIMIT 1').get(node, node); const pk = nr ? nr.public_key : node; where.push('decoded_json LIKE ?'); params.push('%' + pk + '%'); } catch(e) { where.push('decoded_json LIKE ?'); params.push('%' + node + '%'); } }
|
|
const w = where.length ? 'WHERE ' + where.join(' AND ') : '';
|
|
const total = this.db.prepare(`SELECT COUNT(*) as c FROM packets_v ${w}`).get(...params).c;
|
|
const packets = this.db.prepare(`SELECT * FROM packets_v ${w} ORDER BY timestamp ${order === 'ASC' ? 'ASC' : 'DESC'} LIMIT ? OFFSET ?`).all(...params, limit, offset);
|
|
return { packets, total };
|
|
}
|
|
|
|
/** SQLite fallback: grouped query */
|
|
_queryGroupedSQLite({ limit, offset, type, route, region, observer, hash, since, until, node }) {
|
|
const where = []; const params = [];
|
|
if (type !== undefined) { where.push('payload_type = ?'); params.push(Number(type)); }
|
|
if (route !== undefined) { where.push('route_type = ?'); params.push(Number(route)); }
|
|
if (observer) { where.push('observer_id = ?'); params.push(observer); }
|
|
if (hash) { where.push('hash = ?'); params.push(hash.toLowerCase()); }
|
|
if (since) { where.push('timestamp > ?'); params.push(since); }
|
|
if (until) { where.push('timestamp < ?'); params.push(until); }
|
|
if (region) { where.push('observer_id IN (SELECT id FROM observers WHERE iata = ?)'); params.push(region); }
|
|
if (node) { try { const nr = this.db.prepare('SELECT public_key FROM nodes WHERE public_key = ? OR name = ? LIMIT 1').get(node, node); const pk = nr ? nr.public_key : node; where.push('decoded_json LIKE ?'); params.push('%' + pk + '%'); } catch(e) { where.push('decoded_json LIKE ?'); params.push('%' + node + '%'); } }
|
|
const w = where.length ? 'WHERE ' + where.join(' AND ') : '';
|
|
|
|
const sql = `SELECT hash, COUNT(*) as count, COUNT(DISTINCT observer_id) as observer_count,
|
|
MAX(timestamp) as latest, MIN(observer_id) as observer_id, MIN(observer_name) as observer_name,
|
|
MIN(path_json) as path_json, MIN(payload_type) as payload_type, MIN(route_type) as route_type,
|
|
MIN(raw_hex) as raw_hex, MIN(decoded_json) as decoded_json, MIN(snr) as snr, MIN(rssi) as rssi
|
|
FROM packets_v ${w} GROUP BY hash ORDER BY latest DESC LIMIT ? OFFSET ?`;
|
|
const packets = this.db.prepare(sql).all(...params, limit, offset);
|
|
|
|
const countSql = `SELECT COUNT(DISTINCT hash) as c FROM packets_v ${w}`;
|
|
const total = this.db.prepare(countSql).get(...params).c;
|
|
return { packets, total };
|
|
}
|
|
}
|
|
|
|
module.exports = PacketStore;
|