M3: Restructure in-memory store around transmissions

- load() reads from transmissions JOIN observations (with legacy fallback)
- byHash now maps hash → single transmission object (1:1)
- byNode maps pubkey → [transmissions] (deduped, no inflated observations)
- byTransmission is the primary data structure
- byId maps observation IDs for backward-compat packet detail links
- byObserver still maps observer_id → [observations]
- getSiblings() returns observations from transmission
- findPacketsForNode() returns unique transmissions
- query()/queryGrouped() work with transmission-centric model
- All returned objects maintain backward-compatible fields
- SQLite-only fallback path (NO_MEMORY_STORE=1) unchanged
- Tested: 11.6K transmissions from 37.5K observations (3.2x dedup)
This commit is contained in:
you
2026-03-20 20:44:32 +00:00
parent baa60cac0f
commit 84f33aef7b
+353 -143
View File
@@ -1,8 +1,9 @@
'use strict';
/**
* In-memory packet store — loads all packets from SQLite on startup,
* In-memory packet store — loads transmissions + observations from SQLite on startup,
* serves reads from RAM, writes to both RAM + SQLite.
* M3: Restructured around transmissions (deduped by hash) with observations.
* Caps memory at configurable limit (default 1GB).
*/
class PacketStore {
@@ -16,17 +17,22 @@ class PacketStore {
// SQLite-only mode: skip RAM loading, all reads go to DB
this.sqliteOnly = process.env.NO_MEMORY_STORE === '1';
// Core storage: array sorted by timestamp DESC (newest first)
// Primary storage: transmissions sorted by first_seen DESC (newest first)
// Each transmission looks like a packet for backward compat
this.packets = [];
// Indexes
this.byId = new Map();
this.byHash = new Map(); // hash → [packet, ...]
this.byObserver = new Map(); // observer_id → [packet, ...]
this.byNode = new Map(); // pubkey → [packet, ...]
this.byTransmission = new Map(); // hash → {id, hash, first_seen, payload_type, decoded_json, observations: []}
this.byId = new Map(); // observation_id → observation object (backward compat for packet detail links)
this.byHash = new Map(); // hash → transmission object (1:1)
this.byObserver = new Map(); // observer_id → [observation objects]
this.byNode = new Map(); // pubkey → [transmission objects] (deduped)
this.byTransmission = new Map(); // hash → transmission object (same refs as byHash)
// Track which hashes are indexed per node pubkey (avoid dupes in byNode)
this._nodeHashIndex = new Map(); // pubkey → Set<hash>
this.loaded = false;
this.stats = { totalLoaded: 0, evicted: 0, inserts: 0, queries: 0 };
this.stats = { totalLoaded: 0, totalObservations: 0, evicted: 0, inserts: 0, queries: 0 };
}
/** Load all packets from SQLite into memory */
@@ -36,88 +42,310 @@ class PacketStore {
this.loaded = true;
return this;
}
const t0 = Date.now();
// Check if normalized schema exists
const hasTransmissions = this.db.prepare(
"SELECT name FROM sqlite_master WHERE type='table' AND name='transmissions'"
).get();
if (hasTransmissions) {
this._loadNormalized();
} else {
this._loadLegacy();
}
this.stats.totalLoaded = this.packets.length;
this.loaded = true;
const elapsed = Date.now() - t0;
console.log(`[PacketStore] Loaded ${this.packets.length} transmissions (${this.stats.totalObservations} observations) in ${elapsed}ms (${Math.round(this.packets.length * this.estPacketBytes / 1024 / 1024)}MB est)`);
return this;
}
/** Load from normalized transmissions + observations tables */
_loadNormalized() {
const rows = this.db.prepare(`
SELECT t.id AS transmission_id, t.raw_hex, t.hash, t.first_seen, t.route_type,
t.payload_type, t.payload_version, t.decoded_json,
o.id AS observation_id, o.observer_id, o.observer_name, o.direction,
o.snr, o.rssi, o.score, o.path_json, o.timestamp AS obs_timestamp
FROM transmissions t
LEFT JOIN observations o ON o.transmission_id = t.id
ORDER BY t.first_seen DESC, o.timestamp DESC
`).all();
for (const row of rows) {
if (this.packets.length >= this.maxPackets && !this.byTransmission.has(row.hash)) break;
let tx = this.byTransmission.get(row.hash);
if (!tx) {
tx = {
id: row.transmission_id,
raw_hex: row.raw_hex,
hash: row.hash,
first_seen: row.first_seen,
timestamp: row.first_seen,
route_type: row.route_type,
payload_type: row.payload_type,
decoded_json: row.decoded_json,
observations: [],
observation_count: 0,
// Filled from first observation for backward compat
observer_id: null,
observer_name: null,
snr: null,
rssi: null,
path_json: null,
direction: null,
};
this.byTransmission.set(row.hash, tx);
this.byHash.set(row.hash, tx);
this.packets.push(tx);
this._indexByNode(tx);
}
if (row.observation_id != null) {
const obs = {
id: row.observation_id,
observer_id: row.observer_id,
observer_name: row.observer_name,
direction: row.direction,
snr: row.snr,
rssi: row.rssi,
score: row.score,
path_json: row.path_json,
timestamp: row.obs_timestamp,
// Carry transmission fields for backward compat
hash: row.hash,
raw_hex: row.raw_hex,
payload_type: row.payload_type,
decoded_json: row.decoded_json,
route_type: row.route_type,
};
tx.observations.push(obs);
tx.observation_count++;
// Fill first observation data into transmission for backward compat
if (tx.observer_id == null && obs.observer_id) {
tx.observer_id = obs.observer_id;
tx.observer_name = obs.observer_name;
tx.snr = obs.snr;
tx.rssi = obs.rssi;
tx.path_json = obs.path_json;
tx.direction = obs.direction;
}
// byId maps observation IDs for packet detail links
this.byId.set(obs.id, obs);
// byObserver
if (obs.observer_id) {
if (!this.byObserver.has(obs.observer_id)) this.byObserver.set(obs.observer_id, []);
this.byObserver.get(obs.observer_id).push(obs);
}
this.stats.totalObservations++;
}
}
}
/** Fallback: load from legacy packets table */
_loadLegacy() {
const rows = this.db.prepare(
'SELECT * FROM packets ORDER BY timestamp DESC'
).all();
for (const row of rows) {
if (this.packets.length >= this.maxPackets) break;
this._index(row);
this.packets.push(row);
this._indexLegacy(row);
}
this.stats.totalLoaded = this.packets.length;
this.loaded = true;
const elapsed = Date.now() - t0;
console.log(`[PacketStore] Loaded ${this.packets.length} packets in ${elapsed}ms (${Math.round(this.packets.length * this.estPacketBytes / 1024 / 1024)}MB est)`);
return this;
}
/** Index a packet into all lookup maps */
_index(pkt) {
this.byId.set(pkt.id, pkt);
if (pkt.hash) {
if (!this.byHash.has(pkt.hash)) this.byHash.set(pkt.hash, []);
this.byHash.get(pkt.hash).push(pkt);
/** Index a legacy packet row (old flat structure) — builds transmission + observation */
_indexLegacy(pkt) {
let tx = this.byTransmission.get(pkt.hash);
if (!tx) {
tx = {
id: pkt.id,
raw_hex: pkt.raw_hex,
hash: pkt.hash,
first_seen: pkt.timestamp,
timestamp: pkt.timestamp,
route_type: pkt.route_type,
payload_type: pkt.payload_type,
decoded_json: pkt.decoded_json,
observations: [],
observation_count: 0,
observer_id: pkt.observer_id,
observer_name: pkt.observer_name,
snr: pkt.snr,
rssi: pkt.rssi,
path_json: pkt.path_json,
direction: pkt.direction,
};
this.byTransmission.set(pkt.hash, tx);
this.byHash.set(pkt.hash, tx);
this.packets.push(tx);
this._indexByNode(tx);
}
if (pkt.timestamp < tx.first_seen) {
tx.first_seen = pkt.timestamp;
tx.timestamp = pkt.timestamp;
}
const obs = {
id: pkt.id,
observer_id: pkt.observer_id,
observer_name: pkt.observer_name,
direction: pkt.direction,
snr: pkt.snr,
rssi: pkt.rssi,
score: pkt.score,
path_json: pkt.path_json,
timestamp: pkt.timestamp,
hash: pkt.hash,
raw_hex: pkt.raw_hex,
payload_type: pkt.payload_type,
decoded_json: pkt.decoded_json,
route_type: pkt.route_type,
};
tx.observations.push(obs);
tx.observation_count++;
this.byId.set(pkt.id, obs);
if (pkt.observer_id) {
if (!this.byObserver.has(pkt.observer_id)) this.byObserver.set(pkt.observer_id, []);
this.byObserver.get(pkt.observer_id).push(pkt);
this.byObserver.get(pkt.observer_id).push(obs);
}
// Index by node pubkeys mentioned in decoded_json
this._indexByNode(pkt);
// Index by transmission (dedup view)
if (pkt.hash) {
if (!this.byTransmission.has(pkt.hash)) {
this.byTransmission.set(pkt.hash, {
id: pkt.id,
hash: pkt.hash,
first_seen: pkt.timestamp,
payload_type: pkt.payload_type,
decoded_json: pkt.decoded_json,
observations: [],
});
}
const tx = this.byTransmission.get(pkt.hash);
if (pkt.timestamp < tx.first_seen) tx.first_seen = pkt.timestamp;
tx.observations.push({
id: pkt.id,
observer_id: pkt.observer_id,
observer_name: pkt.observer_name,
direction: pkt.direction,
snr: pkt.snr,
rssi: pkt.rssi,
score: pkt.score,
path_json: pkt.path_json,
timestamp: pkt.timestamp,
});
}
this.stats.totalObservations++;
}
/** Extract node pubkeys/names from decoded_json and index */
_indexByNode(pkt) {
if (!pkt.decoded_json) return;
/** Extract node pubkeys from decoded_json and index transmission in byNode */
_indexByNode(tx) {
if (!tx.decoded_json) return;
try {
const decoded = JSON.parse(pkt.decoded_json);
const decoded = JSON.parse(tx.decoded_json);
const keys = new Set();
if (decoded.pubKey) keys.add(decoded.pubKey);
if (decoded.destPubKey) keys.add(decoded.destPubKey);
if (decoded.srcPubKey) keys.add(decoded.srcPubKey);
for (const k of keys) {
if (!this._nodeHashIndex.has(k)) this._nodeHashIndex.set(k, new Set());
if (this._nodeHashIndex.get(k).has(tx.hash)) continue; // already indexed
this._nodeHashIndex.get(k).add(tx.hash);
if (!this.byNode.has(k)) this.byNode.set(k, []);
this.byNode.get(k).push(pkt);
this.byNode.get(k).push(tx);
}
} catch {}
}
/** Remove oldest transmissions when over memory limit */
_evict() {
while (this.packets.length > this.maxPackets) {
const old = this.packets.pop();
this.byHash.delete(old.hash);
this.byTransmission.delete(old.hash);
// Remove observations from byId and byObserver
for (const obs of old.observations) {
this.byId.delete(obs.id);
if (obs.observer_id && this.byObserver.has(obs.observer_id)) {
const arr = this.byObserver.get(obs.observer_id).filter(o => o.id !== obs.id);
if (arr.length) this.byObserver.set(obs.observer_id, arr); else this.byObserver.delete(obs.observer_id);
}
}
// Skip node index cleanup (expensive, low value)
this.stats.evicted++;
}
}
/** Insert a new packet (to both memory and SQLite) */
insert(packetData) {
const id = this.dbModule.insertPacket(packetData);
const row = this.dbModule.getPacket(id);
if (row && !this.sqliteOnly) {
// Update or create transmission in memory
let tx = this.byTransmission.get(row.hash);
if (!tx) {
tx = {
id: row.id,
raw_hex: row.raw_hex,
hash: row.hash,
first_seen: row.timestamp,
timestamp: row.timestamp,
route_type: row.route_type,
payload_type: row.payload_type,
decoded_json: row.decoded_json,
observations: [],
observation_count: 0,
observer_id: row.observer_id,
observer_name: row.observer_name,
snr: row.snr,
rssi: row.rssi,
path_json: row.path_json,
direction: row.direction,
};
this.byTransmission.set(row.hash, tx);
this.byHash.set(row.hash, tx);
this.packets.unshift(tx); // newest first
this._indexByNode(tx);
} else {
// Update first_seen if earlier
if (row.timestamp < tx.first_seen) {
tx.first_seen = row.timestamp;
tx.timestamp = row.timestamp;
}
}
// Add observation
const obs = {
id: row.id,
observer_id: row.observer_id,
observer_name: row.observer_name,
direction: row.direction,
snr: row.snr,
rssi: row.rssi,
score: row.score,
path_json: row.path_json,
timestamp: row.timestamp,
hash: row.hash,
raw_hex: row.raw_hex,
payload_type: row.payload_type,
decoded_json: row.decoded_json,
route_type: row.route_type,
};
tx.observations.push(obs);
tx.observation_count++;
// Update transmission's display fields if this is first observation
if (tx.observations.length === 1) {
tx.observer_id = obs.observer_id;
tx.observer_name = obs.observer_name;
tx.snr = obs.snr;
tx.rssi = obs.rssi;
tx.path_json = obs.path_json;
}
this.byId.set(obs.id, obs);
if (obs.observer_id) {
if (!this.byObserver.has(obs.observer_id)) this.byObserver.set(obs.observer_id, []);
this.byObserver.get(obs.observer_id).push(obs);
}
this.stats.totalObservations++;
this._evict();
this.stats.inserts++;
}
return id;
}
/**
* Find ALL packets referencing a node — by pubkey index + name + pubkey text search.
* Single source of truth for "get packets for node X".
* Returns unique transmissions (deduped).
* @param {string} nodeIdOrName - pubkey or friendly name
* @param {Array} [fromPackets] - packet array to filter (defaults to this.packets)
* @returns {{ packets: Array, pubkey: string, nodeName: string }}
@@ -132,49 +360,24 @@ class PacketStore {
if (row) { pubkey = row.public_key; nodeName = row.name || nodeIdOrName; }
} catch {}
// Combine: index hits + text search by both name and pubkey
// Combine: index hits + text search
const indexed = this.byNode.get(pubkey);
const idSet = indexed ? new Set(indexed.map(p => p.id)) : new Set();
const hashSet = indexed ? new Set(indexed.map(t => t.hash)) : new Set();
const source = fromPackets || this.packets;
const packets = source.filter(p =>
idSet.has(p.id) ||
(p.decoded_json && (p.decoded_json.includes(nodeName) || p.decoded_json.includes(pubkey)))
const packets = source.filter(t =>
hashSet.has(t.hash) ||
(t.decoded_json && (t.decoded_json.includes(nodeName) || t.decoded_json.includes(pubkey)))
);
return { packets, pubkey, nodeName };
}
/** Remove oldest packets when over memory limit */
_evict() {
while (this.packets.length > this.maxPackets) {
const old = this.packets.pop();
this.byId.delete(old.id);
// Remove from hash index
if (old.hash && this.byHash.has(old.hash)) {
const arr = this.byHash.get(old.hash).filter(p => p.id !== old.id);
if (arr.length) this.byHash.set(old.hash, arr); else this.byHash.delete(old.hash);
}
// Remove from observer index
if (old.observer_id && this.byObserver.has(old.observer_id)) {
const arr = this.byObserver.get(old.observer_id).filter(p => p.id !== old.id);
if (arr.length) this.byObserver.set(old.observer_id, arr); else this.byObserver.delete(old.observer_id);
}
// Skip node index cleanup for eviction (expensive, low value)
this.stats.evicted++;
}
}
/** Insert a new packet (to both memory and SQLite) */
insert(packetData) {
const id = this.dbModule.insertPacket(packetData);
const row = this.dbModule.getPacket(id);
if (row) {
this.packets.unshift(row); // newest first
this._index(row);
this._evict();
this.stats.inserts++;
}
return id;
/** Count transmissions and observations for a node */
countForNode(pubkey) {
const txs = this.byNode.get(pubkey) || [];
let observations = 0;
for (const tx of txs) observations += tx.observation_count;
return { transmissions: txs.length, observations };
}
/** Query packets with filters — all from memory (or SQLite in fallback mode) */
@@ -187,9 +390,11 @@ class PacketStore {
// Use indexes for single-key filters when possible
if (hash && !type && !route && !region && !observer && !since && !until && !node) {
results = this.byHash.get(hash) || [];
const tx = this.byHash.get(hash);
results = tx ? [tx] : [];
} else if (observer && !type && !route && !region && !hash && !since && !until && !node) {
results = this.byObserver.get(observer) || [];
// For observer filter, find unique transmissions where any observation matches
results = this._transmissionsForObserver(observer);
} else if (node && !type && !route && !region && !observer && !hash && !since && !until) {
results = this.findPacketsForNode(node).packets;
} else {
@@ -202,18 +407,22 @@ class PacketStore {
const r = Number(route);
results = results.filter(p => p.route_type === r);
}
if (observer) results = results.filter(p => p.observer_id === observer);
if (hash) results = results.filter(p => p.hash === hash);
if (observer) results = this._transmissionsForObserver(observer, results);
if (hash) {
const tx = this.byHash.get(hash);
results = tx ? results.filter(p => p.hash === hash) : [];
}
if (since) results = results.filter(p => p.timestamp > since);
if (until) results = results.filter(p => p.timestamp < until);
if (region) {
// Need to look up observers for this region
const regionObservers = new Set();
try {
const obs = this.db.prepare('SELECT id FROM observers WHERE iata = ?').all(region);
obs.forEach(o => regionObservers.add(o.id));
} catch {}
results = results.filter(p => regionObservers.has(p.observer_id));
results = results.filter(p =>
p.observations.some(o => regionObservers.has(o.observer_id))
);
}
if (node) {
results = this.findPacketsForNode(node, results).packets;
@@ -237,52 +446,52 @@ class PacketStore {
return { packets: paginated, total };
}
/** Query with groupByHash — aggregate packets by content hash */
/** Find unique transmissions that have at least one observation from given observer */
_transmissionsForObserver(observerId, fromTransmissions) {
if (fromTransmissions) {
return fromTransmissions.filter(tx =>
tx.observations.some(o => o.observer_id === observerId)
);
}
// Use byObserver index: get observations, then unique transmissions
const obs = this.byObserver.get(observerId) || [];
const seen = new Set();
const result = [];
for (const o of obs) {
if (!seen.has(o.hash)) {
seen.add(o.hash);
const tx = this.byTransmission.get(o.hash);
if (tx) result.push(tx);
}
}
return result;
}
/** Query with groupByHash — now trivial since packets ARE transmissions */
queryGrouped({ limit = 50, offset = 0, type, route, region, observer, hash, since, until, node } = {}) {
this.stats.queries++;
if (this.sqliteOnly) return this._queryGroupedSQLite({ limit, offset, type, route, region, observer, hash, since, until, node });
// Get filtered results first
// Get filtered transmissions
const { packets: filtered, total: filteredTotal } = this.query({
limit: 999999, offset: 0, type, route, region, observer, hash, since, until, node
});
// Group by hash
const groups = new Map();
for (const p of filtered) {
const h = p.hash || p.id.toString();
if (!groups.has(h)) {
groups.set(h, {
hash: p.hash,
observer_count: new Set(),
count: 0,
latest: p.timestamp,
observer_id: p.observer_id,
observer_name: p.observer_name,
path_json: p.path_json,
payload_type: p.payload_type,
raw_hex: p.raw_hex,
decoded_json: p.decoded_json,
});
}
const g = groups.get(h);
g.count++;
if (p.observer_id) g.observer_count.add(p.observer_id);
if (p.timestamp > g.latest) {
g.latest = p.timestamp;
}
// Keep longest path
if (p.path_json && (!g.path_json || p.path_json.length > g.path_json.length)) {
g.path_json = p.path_json;
g.raw_hex = p.raw_hex;
}
}
// Sort by latest DESC, paginate
const sorted = [...groups.values()]
.map(g => ({ ...g, observer_count: g.observer_count.size }))
.sort((a, b) => b.latest.localeCompare(a.latest));
// Already grouped by hash — just format for backward compat
const sorted = filtered.map(tx => ({
hash: tx.hash,
count: tx.observation_count,
observer_count: new Set(tx.observations.map(o => o.observer_id).filter(Boolean)).size,
latest: tx.observations.length ? tx.observations.reduce((max, o) => o.timestamp > max ? o.timestamp : max, tx.observations[0].timestamp) : tx.timestamp,
observer_id: tx.observer_id,
observer_name: tx.observer_name,
path_json: tx.path_json,
payload_type: tx.payload_type,
raw_hex: tx.raw_hex,
decoded_json: tx.decoded_json,
observation_count: tx.observation_count,
})).sort((a, b) => b.latest.localeCompare(a.latest));
const total = sorted.length;
const paginated = sorted.slice(Number(offset), Number(offset) + Number(limit));
@@ -302,25 +511,26 @@ class PacketStore {
return results.reverse();
}
/** Get a single packet by ID */
/** Get a single packet by ID — checks observation IDs first (backward compat) */
getById(id) {
if (this.sqliteOnly) return this.db.prepare('SELECT * FROM packets WHERE id = ?').get(id) || null;
return this.byId.get(id) || null;
}
/** Get all siblings of a packet (same hash) */
/** Get all siblings of a packet (same hash) — returns observations array */
getSiblings(hash) {
if (this.sqliteOnly) return this.db.prepare('SELECT * FROM packets WHERE hash = ? ORDER BY timestamp DESC').all(hash);
return this.byHash.get(hash) || [];
const tx = this.byTransmission.get(hash);
return tx ? tx.observations : [];
}
/** Get all packets (raw array reference — do not mutate) */
/** Get all transmissions (backward compat — returns packets array) */
all() {
if (this.sqliteOnly) return this.db.prepare('SELECT * FROM packets ORDER BY timestamp DESC').all();
return this.packets;
}
/** Get all packets matching a filter function */
/** Get all transmissions matching a filter function */
filter(fn) {
if (this.sqliteOnly) return this.db.prepare('SELECT * FROM packets ORDER BY timestamp DESC').all().filter(fn);
return this.packets.filter(fn);