Files
meshcore-analyzer/packet-store.js
Kpa-clawbot c99a1ac756 fix: keep hash on observations, enrich only where raw_hex/decoded_json needed
- Add hash field back to observation objects in packet-store.js (both
  _loadNormalized and insert paths) — it's only 16 chars, negligible
  memory vs the big fields raw_hex + decoded_json
- Fix /api/analytics/signal: look up raw_hex from transmission via
  byTxId for packet size calculation
- Fix /api/observers/:id/analytics: enrich obsPackets so payload_type
  and decoded_json are available for type breakdown and node buckets
- Endpoints /api/nodes/bulk-health, /api/nodes/network-status, and
  /api/analytics/subpaths now work because observations carry hash

All 625 tests pass (unit + integration).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-03-26 23:27:45 -07:00

753 lines
29 KiB
JavaScript

'use strict';
/**
* In-memory packet store — loads transmissions + observations from SQLite on startup,
* serves reads from RAM, writes to both RAM + SQLite.
* M3: Restructured around transmissions (deduped by hash) with observations.
* Caps memory at configurable limit (default 1GB).
*/
class PacketStore {
constructor(dbModule, config = {}) {
this.dbModule = dbModule; // The full db module (has .db, .insertTransmission, .getPacket)
this.db = dbModule.db; // Raw better-sqlite3 instance for queries
this.maxBytes = (config.maxMemoryMB || 1024) * 1024 * 1024;
this.estPacketBytes = config.estimatedPacketBytes || 450;
this.maxPackets = Math.floor(this.maxBytes / this.estPacketBytes);
// SQLite-only mode: skip RAM loading, all reads go to DB
this.sqliteOnly = process.env.NO_MEMORY_STORE === '1';
// Primary storage: transmissions sorted by first_seen DESC (newest first)
// Each transmission looks like a packet for backward compat
this.packets = [];
// Indexes
this.byId = new Map(); // observation_id → observation object (backward compat for packet detail links)
this.byTxId = new Map(); // transmission_id → transmission object
this.byHash = new Map(); // hash → transmission object (1:1)
this.byObserver = new Map(); // observer_id → [observation objects]
this.byNode = new Map(); // pubkey → [transmission objects] (deduped)
// Track which hashes are indexed per node pubkey (avoid dupes in byNode)
this._nodeHashIndex = new Map(); // pubkey → Set<hash>
this._advertByObserver = new Map(); // pubkey → Set<observer_id> (ADVERT-only, for region filtering)
this.loaded = false;
this.stats = { totalLoaded: 0, totalObservations: 0, evicted: 0, inserts: 0, queries: 0 };
}
/** Load all packets from SQLite into memory */
load() {
if (this.sqliteOnly) {
console.log('[PacketStore] SQLite-only mode (NO_MEMORY_STORE=1) — all reads go to database');
this.loaded = true;
return this;
}
const t0 = Date.now();
// Check if normalized schema exists
const hasTransmissions = this.db.prepare(
"SELECT name FROM sqlite_master WHERE type='table' AND name='transmissions'"
).get();
if (hasTransmissions) {
this._loadNormalized();
} else {
this._loadLegacy();
}
this.stats.totalLoaded = this.packets.length;
this.loaded = true;
const elapsed = Date.now() - t0;
console.log(`[PacketStore] Loaded ${this.packets.length} transmissions (${this.stats.totalObservations} observations) in ${elapsed}ms (${Math.round(this.packets.length * this.estPacketBytes / 1024 / 1024)}MB est)`);
return this;
}
/** Load from normalized transmissions + observations tables */
_loadNormalized() {
// Detect v3 schema (observer_idx instead of observer_id in observations)
const obsCols = this.db.pragma('table_info(observations)').map(c => c.name);
const isV3 = obsCols.includes('observer_idx');
const sql = isV3
? `SELECT t.id AS transmission_id, t.raw_hex, t.hash, t.first_seen, t.route_type,
t.payload_type, t.payload_version, t.decoded_json,
o.id AS observation_id, obs.id AS observer_id, obs.name AS observer_name, o.direction,
o.snr, o.rssi, o.score, o.path_json, datetime(o.timestamp, 'unixepoch') AS obs_timestamp
FROM transmissions t
LEFT JOIN observations o ON o.transmission_id = t.id
LEFT JOIN observers obs ON obs.rowid = o.observer_idx
ORDER BY t.first_seen DESC, o.timestamp DESC`
: `SELECT t.id AS transmission_id, t.raw_hex, t.hash, t.first_seen, t.route_type,
t.payload_type, t.payload_version, t.decoded_json,
o.id AS observation_id, o.observer_id, o.observer_name, o.direction,
o.snr, o.rssi, o.score, o.path_json, o.timestamp AS obs_timestamp
FROM transmissions t
LEFT JOIN observations o ON o.transmission_id = t.id
ORDER BY t.first_seen DESC, o.timestamp DESC`;
for (const row of this.db.prepare(sql).iterate()) {
if (this.packets.length >= this.maxPackets && !this.byHash.has(row.hash)) break;
let tx = this.byHash.get(row.hash);
if (!tx) {
tx = {
id: row.transmission_id,
raw_hex: row.raw_hex,
hash: row.hash,
first_seen: row.first_seen,
timestamp: row.first_seen,
route_type: row.route_type,
payload_type: row.payload_type,
decoded_json: row.decoded_json,
observations: [],
observation_count: 0,
// Filled from first observation for backward compat
observer_id: null,
observer_name: null,
snr: null,
rssi: null,
path_json: null,
direction: null,
};
this.byHash.set(row.hash, tx);
this.byHash.set(row.hash, tx);
this.packets.push(tx);
this.byTxId.set(tx.id, tx);
this._indexByNode(tx);
}
if (row.observation_id != null) {
const obs = {
id: row.observation_id,
transmission_id: tx.id,
hash: tx.hash,
observer_id: row.observer_id,
observer_name: row.observer_name,
direction: row.direction,
snr: row.snr,
rssi: row.rssi,
score: row.score,
path_json: row.path_json,
timestamp: row.obs_timestamp,
};
// Dedup: skip if same observer + same path already loaded
const isDupeLoad = tx.observations.some(o => o.observer_id === obs.observer_id && (o.path_json || '') === (obs.path_json || ''));
if (isDupeLoad) continue;
tx.observations.push(obs);
tx.observation_count++;
// Fill first observation data into transmission for backward compat
if (tx.observer_id == null && obs.observer_id) {
tx.observer_id = obs.observer_id;
tx.observer_name = obs.observer_name;
tx.snr = obs.snr;
tx.rssi = obs.rssi;
tx.path_json = obs.path_json;
tx.direction = obs.direction;
}
// byId maps observation IDs for packet detail links
this.byId.set(obs.id, obs);
// byObserver
if (obs.observer_id) {
if (!this.byObserver.has(obs.observer_id)) this.byObserver.set(obs.observer_id, []);
this.byObserver.get(obs.observer_id).push(obs);
}
this.stats.totalObservations++;
}
}
// Post-load: set each transmission's display path to the LONGEST observation path
// (most representative of mesh topology — short paths are just nearby observers)
for (const tx of this.packets) {
if (tx.observations.length > 0) {
let best = tx.observations[0];
let bestLen = 0;
try { bestLen = JSON.parse(best.path_json || '[]').length; } catch {}
for (let i = 1; i < tx.observations.length; i++) {
let len = 0;
try { len = JSON.parse(tx.observations[i].path_json || '[]').length; } catch {}
if (len > bestLen) { best = tx.observations[i]; bestLen = len; }
}
tx.observer_id = best.observer_id;
tx.observer_name = best.observer_name;
tx.snr = best.snr;
tx.rssi = best.rssi;
tx.path_json = best.path_json;
tx.direction = best.direction;
}
}
// Post-load: build ADVERT-by-observer index (needs all observations loaded first)
for (const tx of this.packets) {
if (tx.payload_type === 4 && tx.decoded_json) {
try {
const d = JSON.parse(tx.decoded_json);
if (d.pubKey) this._indexAdvertObservers(d.pubKey, tx);
} catch {}
}
}
console.log(`[PacketStore] ADVERT observer index: ${this._advertByObserver.size} nodes tracked`);
}
/** Fallback: load from legacy packets table */
_loadLegacy() {
for (const row of this.db.prepare(
'SELECT * FROM packets_v ORDER BY timestamp DESC'
).iterate()) {
if (this.packets.length >= this.maxPackets) break;
this._indexLegacy(row);
}
}
/** Index a legacy packet row (old flat structure) — builds transmission + observation */
_indexLegacy(pkt) {
let tx = this.byHash.get(pkt.hash);
if (!tx) {
tx = {
id: pkt.id,
raw_hex: pkt.raw_hex,
hash: pkt.hash,
first_seen: pkt.timestamp,
timestamp: pkt.timestamp,
route_type: pkt.route_type,
payload_type: pkt.payload_type,
decoded_json: pkt.decoded_json,
observations: [],
observation_count: 0,
observer_id: pkt.observer_id,
observer_name: pkt.observer_name,
snr: pkt.snr,
rssi: pkt.rssi,
path_json: pkt.path_json,
direction: pkt.direction,
};
this.byHash.set(pkt.hash, tx);
this.byHash.set(pkt.hash, tx);
this.packets.push(tx);
this.byTxId.set(tx.id, tx);
this._indexByNode(tx);
}
if (pkt.timestamp < tx.first_seen) {
tx.first_seen = pkt.timestamp;
tx.timestamp = pkt.timestamp;
}
// Update display path if new observation has longer path
let newPathLen = 0, curPathLen = 0;
try { newPathLen = JSON.parse(pkt.path_json || '[]').length; } catch {}
try { curPathLen = JSON.parse(tx.path_json || '[]').length; } catch {}
if (newPathLen > curPathLen) {
tx.observer_id = pkt.observer_id;
tx.observer_name = pkt.observer_name;
tx.path_json = pkt.path_json;
}
const obs = {
id: pkt.id,
transmission_id: tx.id,
observer_id: pkt.observer_id,
observer_name: pkt.observer_name,
direction: pkt.direction,
snr: pkt.snr,
rssi: pkt.rssi,
score: pkt.score,
path_json: pkt.path_json,
timestamp: pkt.timestamp,
};
// Dedup: skip if same observer + same path already recorded for this transmission
const isDupe = tx.observations.some(o => o.observer_id === obs.observer_id && (o.path_json || '') === (obs.path_json || ''));
if (isDupe) return tx;
tx.observations.push(obs);
tx.observation_count++;
this.byId.set(pkt.id, obs);
if (pkt.observer_id) {
if (!this.byObserver.has(pkt.observer_id)) this.byObserver.set(pkt.observer_id, []);
this.byObserver.get(pkt.observer_id).push(obs);
}
this.stats.totalObservations++;
}
/** Extract node pubkeys from decoded_json and index transmission in byNode */
_indexByNode(tx) {
if (!tx.decoded_json) return;
try {
const decoded = JSON.parse(tx.decoded_json);
const keys = new Set();
if (decoded.pubKey) keys.add(decoded.pubKey);
if (decoded.destPubKey) keys.add(decoded.destPubKey);
if (decoded.srcPubKey) keys.add(decoded.srcPubKey);
for (const k of keys) {
if (!this._nodeHashIndex.has(k)) this._nodeHashIndex.set(k, new Set());
if (this._nodeHashIndex.get(k).has(tx.hash)) continue;
this._nodeHashIndex.get(k).add(tx.hash);
if (!this.byNode.has(k)) this.byNode.set(k, []);
this.byNode.get(k).push(tx);
}
} catch {}
}
/** Track which observers saw an ADVERT from a given pubkey */
_indexAdvertObservers(pubkey, tx) {
if (!this._advertByObserver.has(pubkey)) this._advertByObserver.set(pubkey, new Set());
const s = this._advertByObserver.get(pubkey);
for (const obs of tx.observations) {
if (obs.observer_id) s.add(obs.observer_id);
}
}
/** Get node pubkeys whose ADVERTs were seen by any of the given observer IDs */
getNodesByAdvertObservers(observerIds) {
const result = new Set();
for (const [pubkey, observers] of this._advertByObserver) {
for (const obsId of observerIds) {
if (observers.has(obsId)) { result.add(pubkey); break; }
}
}
return result;
}
/** Remove oldest transmissions when over memory limit */
_evict() {
while (this.packets.length > this.maxPackets) {
const old = this.packets.pop();
this.byHash.delete(old.hash);
this.byHash.delete(old.hash);
this.byTxId.delete(old.id);
// Remove observations from byId and byObserver
for (const obs of old.observations) {
this.byId.delete(obs.id);
if (obs.observer_id && this.byObserver.has(obs.observer_id)) {
const arr = this.byObserver.get(obs.observer_id).filter(o => o.id !== obs.id);
if (arr.length) this.byObserver.set(obs.observer_id, arr); else this.byObserver.delete(obs.observer_id);
}
}
// Skip node index cleanup (expensive, low value)
this.stats.evicted++;
}
}
/** Insert a new packet (to both memory and SQLite) */
insert(packetData) {
// Write to normalized tables and get the transmission ID
const txResult = this.dbModule.insertTransmission ? this.dbModule.insertTransmission(packetData) : null;
const transmissionId = txResult ? txResult.transmissionId : null;
const observationId = txResult ? txResult.observationId : null;
// Build row directly from packetData — avoids view ID mismatch issues
const row = {
id: observationId,
raw_hex: packetData.raw_hex,
hash: packetData.hash,
timestamp: packetData.timestamp,
route_type: packetData.route_type,
payload_type: packetData.payload_type,
payload_version: packetData.payload_version,
decoded_json: packetData.decoded_json,
observer_id: packetData.observer_id,
observer_name: packetData.observer_name,
snr: packetData.snr,
rssi: packetData.rssi,
path_json: packetData.path_json,
direction: packetData.direction,
};
if (!this.sqliteOnly) {
// Update or create transmission in memory
let tx = this.byHash.get(row.hash);
if (!tx) {
tx = {
id: transmissionId || row.id,
raw_hex: row.raw_hex,
hash: row.hash,
first_seen: row.timestamp,
timestamp: row.timestamp,
route_type: row.route_type,
payload_type: row.payload_type,
decoded_json: row.decoded_json,
observations: [],
observation_count: 0,
observer_id: row.observer_id,
observer_name: row.observer_name,
snr: row.snr,
rssi: row.rssi,
path_json: row.path_json,
direction: row.direction,
};
this.byHash.set(row.hash, tx);
this.byHash.set(row.hash, tx);
this.packets.unshift(tx); // newest first
this.byTxId.set(tx.id, tx);
this._indexByNode(tx);
} else {
// Update first_seen if earlier
if (row.timestamp < tx.first_seen) {
tx.first_seen = row.timestamp;
tx.timestamp = row.timestamp;
}
// Update display path if new observation has longer path
let newPathLen = 0, curPathLen = 0;
try { newPathLen = JSON.parse(row.path_json || '[]').length; } catch {}
try { curPathLen = JSON.parse(tx.path_json || '[]').length; } catch {}
if (newPathLen > curPathLen) {
tx.observer_id = row.observer_id;
tx.observer_name = row.observer_name;
tx.path_json = row.path_json;
}
}
// Add observation
const obs = {
id: row.id,
transmission_id: tx.id,
hash: tx.hash,
observer_id: row.observer_id,
observer_name: row.observer_name,
direction: row.direction,
snr: row.snr,
rssi: row.rssi,
score: row.score,
path_json: row.path_json,
timestamp: row.timestamp,
};
// Dedup: skip if same observer + same path already recorded for this transmission
const isDupe = tx.observations.some(o => o.observer_id === obs.observer_id && (o.path_json || '') === (obs.path_json || ''));
if (!isDupe) {
tx.observations.push(obs);
tx.observation_count++;
}
// Update transmission's display fields if this is first observation
if (tx.observations.length === 1) {
tx.observer_id = obs.observer_id;
tx.observer_name = obs.observer_name;
tx.snr = obs.snr;
tx.rssi = obs.rssi;
tx.path_json = obs.path_json;
}
this.byId.set(obs.id, obs);
if (obs.observer_id) {
if (!this.byObserver.has(obs.observer_id)) this.byObserver.set(obs.observer_id, []);
this.byObserver.get(obs.observer_id).push(obs);
}
this.stats.totalObservations++;
// Update ADVERT observer index for live ingestion
if (tx.payload_type === 4 && obs.observer_id && tx.decoded_json) {
try {
const d = JSON.parse(tx.decoded_json);
if (d.pubKey) {
if (!this._advertByObserver.has(d.pubKey)) this._advertByObserver.set(d.pubKey, new Set());
this._advertByObserver.get(d.pubKey).add(obs.observer_id);
}
} catch {}
}
this._evict();
this.stats.inserts++;
}
return observationId || transmissionId;
}
/**
* Find ALL packets referencing a node — by pubkey index + name + pubkey text search.
* Returns unique transmissions (deduped).
* @param {string} nodeIdOrName - pubkey or friendly name
* @param {Array} [fromPackets] - packet array to filter (defaults to this.packets)
* @returns {{ packets: Array, pubkey: string, nodeName: string }}
*/
findPacketsForNode(nodeIdOrName, fromPackets) {
let pubkey = nodeIdOrName;
let nodeName = nodeIdOrName;
// Always resolve to get both pubkey and name
try {
const row = this.db.prepare("SELECT public_key, name FROM nodes WHERE public_key = ? OR name = ? LIMIT 1").get(nodeIdOrName, nodeIdOrName);
if (row) { pubkey = row.public_key; nodeName = row.name || nodeIdOrName; }
} catch {}
// Combine: index hits + text search
const indexed = this.byNode.get(pubkey);
const hashSet = indexed ? new Set(indexed.map(t => t.hash)) : new Set();
const source = fromPackets || this.packets;
const packets = source.filter(t =>
hashSet.has(t.hash) ||
(t.decoded_json && (t.decoded_json.includes(nodeName) || t.decoded_json.includes(pubkey)))
);
return { packets, pubkey, nodeName };
}
/** Count transmissions and observations for a node */
countForNode(pubkey) {
const txs = this.byNode.get(pubkey) || [];
let observations = 0;
for (const tx of txs) observations += tx.observation_count;
return { transmissions: txs.length, observations };
}
/** Query packets with filters — all from memory (or SQLite in fallback mode) */
query({ limit = 50, offset = 0, type, route, region, observer, hash, since, until, node, order = 'DESC' } = {}) {
this.stats.queries++;
if (this.sqliteOnly) return this._querySQLite({ limit, offset, type, route, region, observer, hash, since, until, node, order });
let results = this.packets;
// Use indexes for single-key filters when possible
if (hash && !type && !route && !region && !observer && !since && !until && !node) {
const tx = this.byHash.get(hash);
results = tx ? [tx] : [];
} else if (observer && !type && !route && !region && !hash && !since && !until && !node) {
// For observer filter, find unique transmissions where any observation matches
results = this._transmissionsForObserver(observer);
} else if (node && !type && !route && !region && !observer && !hash && !since && !until) {
results = this.findPacketsForNode(node).packets;
} else {
// Apply filters sequentially
if (type !== undefined) {
const t = Number(type);
results = results.filter(p => p.payload_type === t);
}
if (route !== undefined) {
const r = Number(route);
results = results.filter(p => p.route_type === r);
}
if (observer) results = this._transmissionsForObserver(observer, results);
if (hash) {
const h = hash.toLowerCase();
const tx = this.byHash.get(h);
results = tx ? results.filter(p => p.hash === h) : [];
}
if (since) results = results.filter(p => p.timestamp > since);
if (until) results = results.filter(p => p.timestamp < until);
if (region) {
const regionObservers = new Set();
try {
const obs = this.db.prepare('SELECT id FROM observers WHERE iata = ?').all(region);
obs.forEach(o => regionObservers.add(o.id));
} catch {}
results = results.filter(p =>
p.observations.some(o => regionObservers.has(o.observer_id))
);
}
if (node) {
results = this.findPacketsForNode(node, results).packets;
}
}
const total = results.length;
// Sort
if (order === 'ASC') {
results = results.slice().sort((a, b) => {
if (a.timestamp < b.timestamp) return -1;
if (a.timestamp > b.timestamp) return 1;
return 0;
});
}
// Default DESC — packets array is already sorted newest-first
// Paginate
const paginated = results.slice(Number(offset), Number(offset) + Number(limit));
return { packets: paginated, total };
}
/** Find unique transmissions that have at least one observation from given observer */
_transmissionsForObserver(observerId, fromTransmissions) {
if (fromTransmissions) {
return fromTransmissions.filter(tx =>
tx.observations.some(o => o.observer_id === observerId)
);
}
// Use byObserver index: get observations, then unique transmissions
const obs = this.byObserver.get(observerId) || [];
const seen = new Set();
const result = [];
for (const o of obs) {
const txId = o.transmission_id;
if (!seen.has(txId)) {
seen.add(txId);
const tx = this.byTxId.get(txId);
if (tx) result.push(tx);
}
}
return result;
}
/** Query with groupByHash — now trivial since packets ARE transmissions */
queryGrouped({ limit = 50, offset = 0, type, route, region, observer, hash, since, until, node } = {}) {
this.stats.queries++;
if (this.sqliteOnly) return this._queryGroupedSQLite({ limit, offset, type, route, region, observer, hash, since, until, node });
// Get filtered transmissions
const { packets: filtered, total: filteredTotal } = this.query({
limit: 999999, offset: 0, type, route, region, observer, hash, since, until, node
});
// Already grouped by hash — just format for backward compat
const sorted = filtered.map(tx => ({
hash: tx.hash,
first_seen: tx.first_seen || tx.timestamp,
count: tx.observation_count,
observer_count: new Set(tx.observations.map(o => o.observer_id).filter(Boolean)).size,
latest: tx.observations.length ? tx.observations.reduce((max, o) => o.timestamp > max ? o.timestamp : max, tx.observations[0].timestamp) : tx.timestamp,
observer_id: tx.observer_id,
observer_name: tx.observer_name,
path_json: tx.path_json,
payload_type: tx.payload_type,
route_type: tx.route_type,
raw_hex: tx.raw_hex,
decoded_json: tx.decoded_json,
observation_count: tx.observation_count,
snr: tx.snr,
rssi: tx.rssi,
})).sort((a, b) => b.latest.localeCompare(a.latest));
const total = sorted.length;
const paginated = sorted.slice(Number(offset), Number(offset) + Number(limit));
return { packets: paginated, total };
}
/** Get timestamps for sparkline */
getTimestamps(since) {
if (this.sqliteOnly) {
return this.db.prepare('SELECT timestamp FROM packets_v WHERE timestamp > ? ORDER BY timestamp ASC').all(since).map(r => r.timestamp);
}
const results = [];
for (const p of this.packets) {
if (p.timestamp <= since) break;
results.push(p.timestamp);
}
return results.reverse();
}
/** Get a single packet by ID — checks observation IDs first (backward compat) */
getById(id) {
if (this.sqliteOnly) return this.db.prepare('SELECT * FROM packets_v WHERE id = ?').get(id) || null;
const obs = this.byId.get(id) || null;
return this._enrichObs(obs);
}
/** Get a transmission by its transmission table ID */
getByTxId(id) {
if (this.sqliteOnly) return this.db.prepare('SELECT * FROM transmissions WHERE id = ?').get(id) || null;
return this.byTxId.get(id) || null;
}
/** Get all siblings of a packet (same hash) — returns enriched observations array */
getSiblings(hash) {
const h = hash.toLowerCase();
if (this.sqliteOnly) return this.db.prepare('SELECT * FROM packets_v WHERE hash = ? ORDER BY timestamp DESC').all(h);
const tx = this.byHash.get(h);
return tx ? tx.observations.map(o => this._enrichObs(o)) : [];
}
/** Get all transmissions (backward compat — returns packets array) */
all() {
if (this.sqliteOnly) return this.db.prepare('SELECT * FROM packets_v ORDER BY timestamp DESC').all();
return this.packets;
}
/** Get all transmissions matching a filter function */
filter(fn) {
if (this.sqliteOnly) return this.db.prepare('SELECT * FROM packets_v ORDER BY timestamp DESC').all().filter(fn);
return this.packets.filter(fn);
}
/** Enrich a lean observation with transmission fields (for API responses) */
_enrichObs(obs) {
if (!obs) return null;
const tx = this.byTxId.get(obs.transmission_id);
if (!tx) return obs;
return {
...obs,
hash: tx.hash,
raw_hex: tx.raw_hex,
payload_type: tx.payload_type,
decoded_json: tx.decoded_json,
route_type: tx.route_type,
};
}
/** Enrich an array of observations with transmission fields */
enrichObservations(observations) {
if (!observations || !observations.length) return observations;
return observations.map(o => this._enrichObs(o));
}
/** Memory stats */
getStats() {
return {
...this.stats,
inMemory: this.sqliteOnly ? 0 : this.packets.length,
sqliteOnly: this.sqliteOnly,
maxPackets: this.maxPackets,
estimatedMB: this.sqliteOnly ? 0 : Math.round(this.packets.length * this.estPacketBytes / 1024 / 1024),
maxMB: Math.round(this.maxBytes / 1024 / 1024),
indexes: {
byHash: this.byHash.size,
byObserver: this.byObserver.size,
byNode: this.byNode.size,
advertByObserver: this._advertByObserver.size,
}
};
}
/** SQLite fallback: query with filters */
_querySQLite({ limit, offset, type, route, region, observer, hash, since, until, node, order }) {
const where = []; const params = [];
if (type !== undefined) { where.push('payload_type = ?'); params.push(Number(type)); }
if (route !== undefined) { where.push('route_type = ?'); params.push(Number(route)); }
if (observer) { where.push('observer_id = ?'); params.push(observer); }
if (hash) { where.push('hash = ?'); params.push(hash.toLowerCase()); }
if (since) { where.push('timestamp > ?'); params.push(since); }
if (until) { where.push('timestamp < ?'); params.push(until); }
if (region) { where.push('observer_id IN (SELECT id FROM observers WHERE iata = ?)'); params.push(region); }
if (node) { try { const nr = this.db.prepare('SELECT public_key FROM nodes WHERE public_key = ? OR name = ? LIMIT 1').get(node, node); const pk = nr ? nr.public_key : node; where.push('decoded_json LIKE ?'); params.push('%' + pk + '%'); } catch(e) { where.push('decoded_json LIKE ?'); params.push('%' + node + '%'); } }
const w = where.length ? 'WHERE ' + where.join(' AND ') : '';
const total = this.db.prepare(`SELECT COUNT(*) as c FROM packets_v ${w}`).get(...params).c;
const packets = this.db.prepare(`SELECT * FROM packets_v ${w} ORDER BY timestamp ${order === 'ASC' ? 'ASC' : 'DESC'} LIMIT ? OFFSET ?`).all(...params, limit, offset);
return { packets, total };
}
/** SQLite fallback: grouped query */
_queryGroupedSQLite({ limit, offset, type, route, region, observer, hash, since, until, node }) {
const where = []; const params = [];
if (type !== undefined) { where.push('payload_type = ?'); params.push(Number(type)); }
if (route !== undefined) { where.push('route_type = ?'); params.push(Number(route)); }
if (observer) { where.push('observer_id = ?'); params.push(observer); }
if (hash) { where.push('hash = ?'); params.push(hash.toLowerCase()); }
if (since) { where.push('timestamp > ?'); params.push(since); }
if (until) { where.push('timestamp < ?'); params.push(until); }
if (region) { where.push('observer_id IN (SELECT id FROM observers WHERE iata = ?)'); params.push(region); }
if (node) { try { const nr = this.db.prepare('SELECT public_key FROM nodes WHERE public_key = ? OR name = ? LIMIT 1').get(node, node); const pk = nr ? nr.public_key : node; where.push('decoded_json LIKE ?'); params.push('%' + pk + '%'); } catch(e) { where.push('decoded_json LIKE ?'); params.push('%' + node + '%'); } }
const w = where.length ? 'WHERE ' + where.join(' AND ') : '';
const sql = `SELECT hash, COUNT(*) as count, COUNT(DISTINCT observer_id) as observer_count,
MAX(timestamp) as latest, MIN(observer_id) as observer_id, MIN(observer_name) as observer_name,
MIN(path_json) as path_json, MIN(payload_type) as payload_type, MIN(route_type) as route_type,
MIN(raw_hex) as raw_hex, MIN(decoded_json) as decoded_json, MIN(snr) as snr, MIN(rssi) as rssi
FROM packets_v ${w} GROUP BY hash ORDER BY latest DESC LIMIT ? OFFSET ?`;
const packets = this.db.prepare(sql).all(...params, limit, offset);
const countSql = `SELECT COUNT(DISTINCT hash) as c FROM packets_v ${w}`;
const total = this.db.prepare(countSql).get(...params).c;
return { packets, total };
}
}
module.exports = PacketStore;