refactor: wire server.js to use server-helpers.js for shared functions

Replace duplicated function definitions in server.js with imports from
server-helpers.js. Functions replaced: loadConfigFile, loadThemeFile,
buildHealthConfig, getHealthMs, isHashSizeFlipFlop, computeContentHash,
geoDist, deriveHashtagChannelKey, buildBreakdown, updateHashSizeForPacket,
rebuildHashSizeMap, requireApiKey, CONFIG_PATHS, THEME_PATHS.

disambiguateHops kept in server.js due to behavioral differences in the
distance sanity check (server version nulls lat/lon on unreliable hops
and adds ambiguous field in output mapping).

server.js: 3201 → 3001 lines (-200 lines, -224 deletions/+24 insertions)
All tests pass (unit, e2e, frontend).
This commit is contained in:
you
2026-03-24 01:32:18 +00:00
parent 5b496a8235
commit 616bea0100
+24 -224
View File
@@ -7,169 +7,47 @@ const { WebSocketServer } = require('ws');
const mqtt = require('mqtt');
const path = require('path');
const fs = require('fs');
// Config: bind-mounted config.json first, then fall back to data/ dir
const CONFIG_PATHS = [
path.join(__dirname, 'config.json'),
path.join(__dirname, 'data', 'config.json')
];
function loadConfigFile() {
for (const p of CONFIG_PATHS) {
try { return JSON.parse(fs.readFileSync(p, 'utf8')); } catch {}
}
return {};
}
const helpers = require('./server-helpers');
const { loadConfigFile, loadThemeFile, buildHealthConfig, getHealthMs: _getHealthMs,
isHashSizeFlipFlop, computeContentHash, geoDist, deriveHashtagChannelKey,
buildBreakdown: _buildBreakdown, disambiguateHops: _disambiguateHops,
updateHashSizeForPacket: _updateHashSizeForPacket,
rebuildHashSizeMap: _rebuildHashSizeMap,
requireApiKey: _requireApiKeyFactory,
CONFIG_PATHS, THEME_PATHS } = helpers;
const config = loadConfigFile();
const decoder = require('./decoder');
const PAYLOAD_TYPES = decoder.PAYLOAD_TYPES;
const { nodeNearRegion, IATA_COORDS } = require('./iata-coords');
// Health thresholds — configurable with sensible defaults
const _ht = config.healthThresholds || {};
const HEALTH = {
infraDegradedMs: _ht.infraDegradedMs || 86400000,
infraSilentMs: _ht.infraSilentMs || 259200000,
nodeDegradedMs: _ht.nodeDegradedMs || 3600000,
nodeSilentMs: _ht.nodeSilentMs || 86400000
};
function getHealthMs(role) {
const isInfra = role === 'repeater' || role === 'room';
return {
degradedMs: isInfra ? HEALTH.infraDegradedMs : HEALTH.nodeDegradedMs,
silentMs: isInfra ? HEALTH.infraSilentMs : HEALTH.nodeSilentMs
};
}
const HEALTH = buildHealthConfig(config);
function getHealthMs(role) { return _getHealthMs(role, HEALTH); }
const MAX_HOP_DIST_SERVER = config.maxHopDist || 1.8;
const crypto = require('crypto');
const PacketStore = require('./packet-store');
// --- Precomputed hash_size map (updated on new packets, not per-request) ---
const _hashSizeMap = new Map(); // pubkey → latest hash_size (number)
const _hashSizeAllMap = new Map(); // pubkey → Set of all hash_sizes seen
const _hashSizeSeqMap = new Map(); // pubkey → array of hash_sizes in chronological order (oldest first)
function _rebuildHashSizeMap() {
_hashSizeMap.clear();
_hashSizeAllMap.clear();
_hashSizeSeqMap.clear();
// Pass 1: from ADVERT packets (most authoritative — path byte bits 7-6)
// packets array is sorted newest-first, so first-match = newest ADVERT
for (const p of pktStore.packets) {
if (p.payload_type === 4 && p.raw_hex) {
try {
const d = JSON.parse(p.decoded_json || '{}');
const pk = d.pubKey || d.public_key;
if (pk) {
const pathByte = parseInt(p.raw_hex.slice(2, 4), 16);
const hs = ((pathByte >> 6) & 0x3) + 1;
if (!_hashSizeMap.has(pk)) _hashSizeMap.set(pk, hs);
if (!_hashSizeAllMap.has(pk)) _hashSizeAllMap.set(pk, new Set());
_hashSizeAllMap.get(pk).add(hs);
// Build sequence (will reverse later since packets are newest-first)
if (!_hashSizeSeqMap.has(pk)) _hashSizeSeqMap.set(pk, []);
_hashSizeSeqMap.get(pk).push(hs);
}
} catch {}
}
}
// Reverse sequences to chronological order (oldest first)
for (const [, seq] of _hashSizeSeqMap) seq.reverse();
// Pass 2: for nodes without ADVERTs, derive from path hop lengths in any packet
for (const p of pktStore.packets) {
if (p.path_json) {
try {
const hops = JSON.parse(p.path_json);
if (hops.length > 0) {
const hopLen = hops[0].length / 2;
if (hopLen >= 1 && hopLen <= 4) {
const pathByte = p.raw_hex ? parseInt(p.raw_hex.slice(2, 4), 16) : -1;
const hs = pathByte >= 0 ? ((pathByte >> 6) & 0x3) + 1 : hopLen;
if (p.decoded_json) {
const d = JSON.parse(p.decoded_json);
const pk = d.pubKey || d.public_key;
if (pk && !_hashSizeMap.has(pk)) _hashSizeMap.set(pk, hs);
}
}
}
} catch {}
}
}
function _rebuildHashSizeMapLocal() {
_rebuildHashSizeMap(pktStore.packets, _hashSizeMap, _hashSizeAllMap, _hashSizeSeqMap);
}
// Detect flip-flopping hash sizes (not just upgrades)
// A clean upgrade: [1,1,1,2,2,2] — sizes change once and stay. That's fine.
// Flip-flop: [1,2,1,2] or [2,1,2] — sizes go back and forth. That's a bug.
function _isHashSizeFlipFlop(pubkey) {
const seq = _hashSizeSeqMap.get(pubkey);
if (!seq || seq.length < 3) return false; // need enough samples
const allSizes = _hashSizeAllMap.get(pubkey);
if (!allSizes || allSizes.size < 2) return false; // only one size = no issue
// Count transitions (size changes)
let transitions = 0;
for (let i = 1; i < seq.length; i++) {
if (seq[i] !== seq[i - 1]) transitions++;
}
// A clean upgrade has exactly 1 transition. Flip-flop has 2+.
return transitions >= 2;
return isHashSizeFlipFlop(_hashSizeSeqMap.get(pubkey), _hashSizeAllMap.get(pubkey));
}
// Update hash_size for a single new packet (called on insert)
function _updateHashSizeForPacket(p) {
if (p.payload_type === 4 && p.raw_hex) {
try {
const d = typeof p.decoded_json === 'string' ? JSON.parse(p.decoded_json || '{}') : (p.decoded_json || {});
const pk = d.pubKey || d.public_key;
if (pk) {
const pathByte = parseInt(p.raw_hex.slice(2, 4), 16);
const hs = ((pathByte >> 6) & 0x3) + 1;
_hashSizeMap.set(pk, hs);
if (!_hashSizeAllMap.has(pk)) _hashSizeAllMap.set(pk, new Set());
_hashSizeAllMap.get(pk).add(hs);
if (!_hashSizeSeqMap.has(pk)) _hashSizeSeqMap.set(pk, []);
_hashSizeSeqMap.get(pk).push(hs); // already chronological for live packets
}
} catch {}
} else if (p.path_json && p.decoded_json) {
try {
const d = typeof p.decoded_json === 'string' ? JSON.parse(p.decoded_json) : p.decoded_json;
const pk = d.pubKey || d.public_key;
if (pk && !_hashSizeMap.has(pk)) {
const hops = typeof p.path_json === 'string' ? JSON.parse(p.path_json) : p.path_json;
if (hops.length > 0) {
const pathByte = p.raw_hex ? parseInt(p.raw_hex.slice(2, 4), 16) : -1;
const hs = pathByte >= 0 ? ((pathByte >> 6) & 0x3) + 1 : (hops[0].length / 2);
if (hs >= 1 && hs <= 4) _hashSizeMap.set(pk, hs);
}
}
} catch {}
}
function _updateHashSizeForPacketLocal(p) {
_updateHashSizeForPacket(p, _hashSizeMap, _hashSizeAllMap, _hashSizeSeqMap);
}
// API key middleware for write endpoints
const API_KEY = config.apiKey || null;
function requireApiKey(req, res, next) {
if (!API_KEY) return next(); // no key configured = open (dev mode)
const provided = req.headers['x-api-key'] || req.query.apiKey;
if (provided === API_KEY) return next();
return res.status(401).json({ error: 'Invalid or missing API key' });
}
const requireApiKey = _requireApiKeyFactory(API_KEY);
// Compute a content hash from raw hex: header byte + payload (skipping path hops)
// This correctly groups retransmissions of the same packet (same content, different paths)
function computeContentHash(rawHex) {
try {
const buf = Buffer.from(rawHex, 'hex');
if (buf.length < 2) return rawHex.slice(0, 16);
const pathByte = buf[1];
const hashSize = ((pathByte >> 6) & 0x3) + 1;
const hashCount = pathByte & 0x3F;
const pathBytes = hashSize * hashCount;
const payloadStart = 2 + pathBytes;
const payload = buf.subarray(payloadStart);
const toHash = Buffer.concat([Buffer.from([buf[0]]), payload]);
return crypto.createHash('sha256').update(toHash).digest('hex').slice(0, 16);
} catch { return rawHex.slice(0, 16); }
}
const db = require('./db');
const pktStore = new PacketStore(db, config.packetStore || {}).load();
_rebuildHashSizeMap();
_rebuildHashSizeMapLocal();
// Backfill: fix roles for nodes whose adverts were decoded with old bitfield flags
// ADV_TYPE is a 4-bit enum (0=none, 1=chat, 2=repeater, 3=room, 4=sensor), not individual bits
@@ -220,10 +98,6 @@ function getCachedNodes(includeRole) {
const configuredChannelKeys = config.channelKeys || {};
const hashChannels = Array.isArray(config.hashChannels) ? config.hashChannels : [];
function deriveHashtagChannelKey(channelName) {
return crypto.createHash('sha256').update(channelName).digest('hex').slice(0, 32);
}
const derivedHashChannelKeys = {};
for (const rawChannel of hashChannels) {
if (typeof rawChannel !== 'string') continue;
@@ -454,18 +328,6 @@ function getObserverIdsForRegions(regionParam) {
return ids;
}
// Theme: hot-load from theme.json (same dir as config.json, or data/ dir)
const THEME_PATHS = [
path.join(__dirname, 'theme.json'),
path.join(__dirname, 'data', 'theme.json')
];
function loadThemeFile() {
for (const p of THEME_PATHS) {
try { return JSON.parse(fs.readFileSync(p, 'utf8')); } catch {}
}
return {};
}
app.get('/api/config/theme', (req, res) => {
const cfg = loadConfigFile();
const theme = loadThemeFile();
@@ -631,9 +493,6 @@ function broadcast(msg) {
// When an advert arrives later with a full pubkey matching the prefix, upsertNode will upgrade it
const hopNodeCache = new Set(); // Avoid repeated DB lookups for known hops
// Shared distance helper (degrees, ~111km/lat, ~85km/lon at 37°N)
function geoDist(lat1, lon1, lat2, lon2) { return Math.sqrt((lat1 - lat2) ** 2 + (lon1 - lon2) ** 2); }
// Sequential hop disambiguation: resolve 1-byte prefixes to best-matching nodes
// Returns array of {hop, name, lat, lon, pubkey, ambiguous, unreliable} per hop
function disambiguateHops(hops, allNodes) {
@@ -819,7 +678,7 @@ for (const source of mqttSources) {
path_json: JSON.stringify(decoded.path.hops),
decoded_json: JSON.stringify(decoded.payload),
};
const packetId = pktStore.insert(pktData); _updateHashSizeForPacket(pktData);
const packetId = pktStore.insert(pktData); _updateHashSizeForPacketLocal(pktData);
try { db.insertTransmission(pktData); } catch (e) { console.error('[dual-write] transmission insert error:', e.message); }
if (decoded.path.hops.length > 0) {
@@ -920,7 +779,7 @@ for (const source of mqttSources) {
path_json: JSON.stringify([]),
decoded_json: JSON.stringify(advert),
};
const packetId = pktStore.insert(advertPktData); _updateHashSizeForPacket(advertPktData);
const packetId = pktStore.insert(advertPktData); _updateHashSizeForPacketLocal(advertPktData);
try { db.insertTransmission(advertPktData); } catch (e) { console.error('[dual-write] transmission insert error:', e.message); }
broadcast({ type: 'packet', data: { id: packetId, hash: advertPktData.hash, raw: advertPktData.raw_hex, decoded: { header: { payloadTypeName: 'ADVERT' }, payload: advert } } });
}
@@ -953,7 +812,7 @@ for (const source of mqttSources) {
path_json: JSON.stringify([]),
decoded_json: JSON.stringify(channelMsg),
};
const packetId = pktStore.insert(chPktData); _updateHashSizeForPacket(chPktData);
const packetId = pktStore.insert(chPktData); _updateHashSizeForPacketLocal(chPktData);
try { db.insertTransmission(chPktData); } catch (e) { console.error('[dual-write] transmission insert error:', e.message); }
broadcast({ type: 'packet', data: { id: packetId, hash: chPktData.hash, raw: chPktData.raw_hex, decoded: { header: { payloadTypeName: 'GRP_TXT' }, payload: channelMsg } } });
broadcast({ type: 'message', data: { id: packetId, hash: chPktData.hash, decoded: { header: { payloadTypeName: 'GRP_TXT' }, payload: channelMsg } } });
@@ -976,7 +835,7 @@ for (const source of mqttSources) {
path_json: JSON.stringify(dm.hops || []),
decoded_json: JSON.stringify(dm),
};
const packetId = pktStore.insert(dmPktData); _updateHashSizeForPacket(dmPktData);
const packetId = pktStore.insert(dmPktData); _updateHashSizeForPacketLocal(dmPktData);
try { db.insertTransmission(dmPktData); } catch (e) { console.error('[dual-write] transmission insert error:', e.message); }
broadcast({ type: 'packet', data: { id: packetId, hash: dmPktData.hash, raw: dmPktData.raw_hex, decoded: { header: { payloadTypeName: 'TXT_MSG' }, payload: dm } } });
return;
@@ -998,7 +857,7 @@ for (const source of mqttSources) {
path_json: JSON.stringify(trace.hops || trace.path || []),
decoded_json: JSON.stringify(trace),
};
const packetId = pktStore.insert(tracePktData); _updateHashSizeForPacket(tracePktData);
const packetId = pktStore.insert(tracePktData); _updateHashSizeForPacketLocal(tracePktData);
try { db.insertTransmission(tracePktData); } catch (e) { console.error('[dual-write] transmission insert error:', e.message); }
broadcast({ type: 'packet', data: { id: packetId, hash: tracePktData.hash, raw: tracePktData.raw_hex, decoded: { header: { payloadTypeName: 'TRACE' }, payload: trace } } });
return;
@@ -1117,66 +976,7 @@ app.get('/api/packets/:id', (req, res) => {
});
function buildBreakdown(rawHex, decoded) {
if (!rawHex) return {};
const buf = Buffer.from(rawHex, 'hex');
const ranges = [];
// Header
ranges.push({ start: 0, end: 0, color: 'red', label: 'Header' });
if (buf.length < 2) return { ranges };
// Path length byte
ranges.push({ start: 1, end: 1, color: 'orange', label: 'Path Length' });
const header = decoder.decodePacket(rawHex, channelKeys);
let offset = 2;
// Transport codes
if (header.transportCodes) {
ranges.push({ start: 2, end: 5, color: 'blue', label: 'Transport Codes' });
offset = 6;
}
// Path data
const pathByte = buf[1];
const hashSize = (pathByte >> 6) + 1;
const hashCount = pathByte & 0x3F;
const pathBytes = hashSize * hashCount;
if (pathBytes > 0) {
ranges.push({ start: offset, end: offset + pathBytes - 1, color: 'green', label: 'Path' });
}
const payloadStart = offset + pathBytes;
// Payload
if (payloadStart < buf.length) {
ranges.push({ start: payloadStart, end: buf.length - 1, color: 'yellow', label: 'Payload' });
// Sub-ranges for ADVERT
if (decoded && decoded.type === 'ADVERT') {
const ps = payloadStart;
const subRanges = [];
subRanges.push({ start: ps, end: ps + 31, color: '#FFD700', label: 'PubKey' });
subRanges.push({ start: ps + 32, end: ps + 35, color: '#FFA500', label: 'Timestamp' });
subRanges.push({ start: ps + 36, end: ps + 99, color: '#FF6347', label: 'Signature' });
if (buf.length > ps + 100) {
subRanges.push({ start: ps + 100, end: ps + 100, color: '#7FFFD4', label: 'Flags' });
let off = ps + 101;
const flags = buf[ps + 100];
if (flags & 0x10 && buf.length >= off + 8) {
subRanges.push({ start: off, end: off + 3, color: '#87CEEB', label: 'Latitude' });
subRanges.push({ start: off + 4, end: off + 7, color: '#87CEEB', label: 'Longitude' });
off += 8;
}
if (flags & 0x80 && off < buf.length) {
subRanges.push({ start: off, end: buf.length - 1, color: '#DDA0DD', label: 'Name' });
}
}
ranges.push(...subRanges);
}
}
return { ranges };
return _buildBreakdown(rawHex, decoded, decoder.decodePacket, channelKeys);
}
// Decode-only endpoint (no DB insert)
@@ -1212,7 +1012,7 @@ app.post('/api/packets', requireApiKey, (req, res) => {
path_json: JSON.stringify(decoded.path.hops),
decoded_json: JSON.stringify(decoded.payload),
};
const packetId = pktStore.insert(apiPktData); _updateHashSizeForPacket(apiPktData);
const packetId = pktStore.insert(apiPktData); _updateHashSizeForPacketLocal(apiPktData);
try { db.insertTransmission(apiPktData); } catch (e) { console.error('[dual-write] transmission insert error:', e.message); }
if (decoded.path.hops.length > 0) {