fix: region filter nodes by ADVERT observers, not data packets

The previous approach matched nodes via data packet hashes seen by
regional observers — but mesh packets propagate everywhere, so nearly
every node matched every region (550/558).

New approach: _advertByObserver index tracks which observers saw each
node's ADVERT packets. ADVERTs are local broadcasts that indicate
physical presence, so they're the correct signal for geographic filtering.

Also fixes role counts to reflect filtered results, not global totals.
This commit is contained in:
you
2026-03-21 08:31:55 +00:00
parent b3599694c6
commit 47fa32f982
2 changed files with 61 additions and 21 deletions

View File

@@ -30,6 +30,7 @@ class PacketStore {
// Track which hashes are indexed per node pubkey (avoid dupes in byNode)
this._nodeHashIndex = new Map(); // pubkey → Set<hash>
this._advertByObserver = new Map(); // pubkey → Set<observer_id> (ADVERT-only, for region filtering)
this.loaded = false;
this.stats = { totalLoaded: 0, totalObservations: 0, evicted: 0, inserts: 0, queries: 0 };
@@ -150,6 +151,17 @@ class PacketStore {
this.stats.totalObservations++;
}
}
// Post-load: build ADVERT-by-observer index (needs all observations loaded first)
for (const tx of this.packets) {
if (tx.payload_type === 4 && tx.decoded_json) {
try {
const d = JSON.parse(tx.decoded_json);
if (d.pubKey) this._indexAdvertObservers(d.pubKey, tx);
} catch {}
}
}
console.log(`[PacketStore] ADVERT observer index: ${this._advertByObserver.size} nodes tracked`);
}
/** Fallback: load from legacy packets table */
@@ -242,7 +254,7 @@ class PacketStore {
if (decoded.srcPubKey) keys.add(decoded.srcPubKey);
for (const k of keys) {
if (!this._nodeHashIndex.has(k)) this._nodeHashIndex.set(k, new Set());
if (this._nodeHashIndex.get(k).has(tx.hash)) continue; // already indexed
if (this._nodeHashIndex.get(k).has(tx.hash)) continue;
this._nodeHashIndex.get(k).add(tx.hash);
if (!this.byNode.has(k)) this.byNode.set(k, []);
this.byNode.get(k).push(tx);
@@ -250,6 +262,26 @@ class PacketStore {
} catch {}
}
/** Track which observers saw an ADVERT from a given pubkey */
_indexAdvertObservers(pubkey, tx) {
if (!this._advertByObserver.has(pubkey)) this._advertByObserver.set(pubkey, new Set());
const s = this._advertByObserver.get(pubkey);
for (const obs of tx.observations) {
if (obs.observer_id) s.add(obs.observer_id);
}
}
/** Get node pubkeys whose ADVERTs were seen by any of the given observer IDs */
getNodesByAdvertObservers(observerIds) {
const result = new Set();
for (const [pubkey, observers] of this._advertByObserver) {
for (const obsId of observerIds) {
if (observers.has(obsId)) { result.add(pubkey); break; }
}
}
return result;
}
/** Remove oldest transmissions when over memory limit */
_evict() {
while (this.packets.length > this.maxPackets) {
@@ -348,6 +380,18 @@ class PacketStore {
}
this.stats.totalObservations++;
// Update ADVERT observer index for live ingestion
if (tx.payload_type === 4 && obs.observer_id && tx.decoded_json) {
try {
const d = JSON.parse(tx.decoded_json);
if (d.pubKey) {
if (!this._advertByObserver.has(d.pubKey)) this._advertByObserver.set(d.pubKey, new Set());
this._advertByObserver.get(d.pubKey).add(obs.observer_id);
}
} catch {}
}
this._evict();
this.stats.inserts++;
}
@@ -568,6 +612,7 @@ class PacketStore {
byHash: this.byHash.size,
byObserver: this.byObserver.size,
byNode: this.byNode.size,
advertByObserver: this._advertByObserver.size,
}
};
}

View File

@@ -1036,43 +1036,38 @@ app.get('/api/nodes', (req, res) => {
if (ms) { where.push('last_seen > @since'); params.since = new Date(Date.now() - ms).toISOString(); }
}
// Region filtering: if region param is set, only include nodes seen by observers in those regions
// Region filtering: if region param is set, only include nodes whose ADVERTs were seen by regional observers
const regionObsIds = getObserverIdsForRegions(region);
let regionNodeKeys = null;
if (regionObsIds && regionObsIds.size > 0) {
// Collect all packet hashes seen by regional observers
const regionalHashes = new Set();
for (const obsId of regionObsIds) {
const obs = pktStore.byObserver.get(obsId);
if (obs) for (const o of obs) regionalHashes.add(o.hash);
}
// Find node pubkeys from those packets (via _nodeHashIndex)
regionNodeKeys = new Set();
for (const [pubkey, hashes] of pktStore._nodeHashIndex) {
for (const h of hashes) {
if (regionalHashes.has(h)) { regionNodeKeys.add(pubkey); break; }
}
}
regionNodeKeys = pktStore.getNodesByAdvertObservers(regionObsIds);
}
const clause = where.length ? 'WHERE ' + where.join(' AND ') : '';
const sortMap = { name: 'name ASC', lastSeen: 'last_seen DESC', packetCount: 'advert_count DESC' };
const order = sortMap[sortBy] || 'last_seen DESC';
let nodes, total;
let nodes, total, filteredAll;
if (regionNodeKeys) {
const allNodes = db.db.prepare(`SELECT * FROM nodes ${clause} ORDER BY ${order}`).all(params);
const filtered = allNodes.filter(n => regionNodeKeys.has(n.public_key));
total = filtered.length;
nodes = filtered.slice(Number(offset), Number(offset) + Number(limit));
filteredAll = allNodes.filter(n => regionNodeKeys.has(n.public_key));
total = filteredAll.length;
nodes = filteredAll.slice(Number(offset), Number(offset) + Number(limit));
} else {
nodes = db.db.prepare(`SELECT * FROM nodes ${clause} ORDER BY ${order} LIMIT @limit OFFSET @offset`).all({ ...params, limit: Number(limit), offset: Number(offset) });
total = db.db.prepare(`SELECT COUNT(*) as count FROM nodes ${clause}`).get(params).count;
filteredAll = null;
}
const counts = {};
for (const r of ['repeater', 'room', 'companion', 'sensor']) {
counts[r + 's'] = db.db.prepare(`SELECT COUNT(*) as count FROM nodes WHERE role = ?`).get(r).count;
if (filteredAll) {
for (const r of ['repeater', 'room', 'companion', 'sensor']) {
counts[r + 's'] = filteredAll.filter(n => n.role === r).length;
}
} else {
for (const r of ['repeater', 'room', 'companion', 'sensor']) {
counts[r + 's'] = db.db.prepare(`SELECT COUNT(*) as count FROM nodes WHERE role = ?`).get(r).count;
}
}
// Compute hash_size for each node from ADVERT path byte or path hop lengths