- Added a version indicator to the bottom of the Web Viewer
- Added ability to filter by packet type in Packet Capture Service.
- Improved mesh graph efficiency, added documentation for how to configure mesh graph calculations (or disable it) on lightweight nodes like Raspberry Pi Zero 2s.
- Simplify contact removal logic. Occasional errors will still occur until race condition in meshcore_py 
- Improvements to documentation.
This commit is contained in:
Adam Gessaman
2026-02-21 09:53:14 -08:00
committed by GitHub
19 changed files with 1063 additions and 851 deletions
+11
View File
@@ -57,6 +57,15 @@ jobs:
type=sha,prefix=sha-
type=raw,value=latest,enable=${{ github.ref == 'refs/heads/main' || github.ref == 'refs/heads/master' }}
- name: Set version for web viewer footer
id: version
run: |
if [[ "${{ github.ref }}" == refs/tags/* ]]; then
echo "version=${{ github.ref_name }}" >> $GITHUB_OUTPUT
else
echo "version=dev" >> $GITHUB_OUTPUT
fi
- name: Build and push Docker image
uses: docker/build-push-action@v5
with:
@@ -64,6 +73,8 @@ jobs:
push: ${{ github.event_name != 'pull_request' }}
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
build-args: |
MESHCORE_BOT_VERSION=${{ steps.version.outputs.version }}
cache-from: type=gha
cache-to: type=gha,mode=max
platforms: linux/amd64,linux/arm64
+4
View File
@@ -43,6 +43,10 @@ COPY --from=builder /root/.local /home/meshcore/.local
# Set working directory
WORKDIR /app
# Version for web viewer footer (set at build time; e.g. --build-arg MESHCORE_BOT_VERSION=v1.2.3)
ARG MESHCORE_BOT_VERSION
ENV MESHCORE_BOT_VERSION=${MESHCORE_BOT_VERSION}
# Copy application files
COPY --chown=meshcore:meshcore . /app/
+20 -3
View File
@@ -801,11 +801,25 @@ graph_path_validation_max_bonus = 0.3
# Lower values = stronger bonus from observation count. 50.0 means 50 observations = 0.15 bonus
graph_path_validation_obs_divisor = 50.0
# Load only recent edges on startup (days, 0 = load all)
# Useful for large graphs. Set to 0 to load all edges, or N to load only edges seen in last N days.
# For development with frequent restarts, 0 (load all) is recommended to maintain graph quality
# Load only recent edges on startup (days, 0 = load all historical edges)
# Edges older than this are skipped at startup to bound initial memory usage.
# The in-code default is 14 days when this setting is absent from config.ini.
# Recommended values:
# 0 - Load all historical edges (servers with ample RAM, e.g. x86 VM)
# 14 - Good balance of coverage vs. memory (default for unconfigured installs)
# 7 - Reduced memory footprint for Raspberry Pi Zero 2 W
# Note: edges older than graph_edge_expiration_days are never loaded regardless of this value.
graph_startup_load_days = 0
# Enable graph data capture from incoming packets (default: true)
# When true, the bot observes routing paths from advertisements, messages, and trace
# packets and stores edges in the mesh graph.
# When false, NO new edge data is collected and the background batch writer thread is
# not started — reducing both CPU and RAM overhead. Any edges already in the database
# are still available for graph_based_validation if that is also enabled.
# Set to false on devices that don't use the path command and want minimal overhead.
graph_capture_enabled = true
# Star bias multiplier for path command
# When a contact is starred in the web viewer, multiply its selection score by this value
# Higher values = stronger preference for starred repeaters
@@ -1224,6 +1238,7 @@ mqtt_enabled = true
# mqttN_topic_packets = # Packets topic template (uses placeholders below)
# mqttN_topic_prefix = # Legacy topic prefix (fallback if topic_status/topic_packets not set)
# mqttN_client_id = # MQTT client ID (optional, auto-generated from bot name)
# mqttN_upload_packet_types = # Comma-separated packet types to upload (e.g. 2,4); empty = all
#
# Topic template placeholders:
# {IATA} - Uppercase IATA code (e.g., SEA)
@@ -1243,6 +1258,7 @@ mqtt1_topic_status = meshcore/{IATA}/{PUBLIC_KEY}/status
mqtt1_topic_packets = meshcore/{IATA}/{PUBLIC_KEY}/packets
mqtt1_websocket_path = /mqtt
mqtt1_client_id =
mqtt1_upload_packet_types =
# MQTT Broker 2 - Let's Mesh Analyzer (EU)
mqtt2_enabled = true
@@ -1256,6 +1272,7 @@ mqtt2_topic_status = meshcore/{IATA}/{PUBLIC_KEY}/status
mqtt2_topic_packets = meshcore/{IATA}/{PUBLIC_KEY}/packets
mqtt2_websocket_path = /mqtt
mqtt2_client_id =
mqtt2_upload_packet_types =
# Stats and status publishing
# Enable stats in status messages
+49
View File
@@ -27,3 +27,52 @@ Without `--upgrade`, the script does *not* update the service file (systemd/laun
### How can I generate a custom command reference for my bot users?
See [Custom command reference website](command-reference-website.md): it explains how to use `generate_website.py` to build a single-page HTML from your config (with optional styles) and upload it to your site.
## Hardware and performance
### How do I run meshcore-bot on a Raspberry Pi Zero 2 W?
The Pi Zero 2 W has 512 MB of RAM. The bot and the web viewer are two separate
Python processes; together they use roughly 300 MB on a busy mesh, which leaves
little headroom. Follow the two steps below to keep things comfortable.
#### Step 1 — Run the bot only (saves ~150 MB)
The web viewer is optional. If you don't need the browser-based dashboard on
the Pi itself, disable it and access it from another machine instead:
```ini
[Web_Viewer]
enabled = false
auto_start = false
```
The bot continues to work normally; the web viewer just won't start on the Pi.
If you still want the dashboard, run the viewer on a desktop or server that
shares the same database file (see [MeshCore Bot Data Viewer](web-viewer.md)).
#### Step 2 — Tune the Mesh Graph (saves another 50100 MB on busy meshes)
Even with the web viewer off, the Mesh Graph can grow large. Add the following
to the `[Path_Command]` section of your `config.ini`:
```ini
[Path_Command]
# Limit startup memory: only load edges seen in the last 7 days.
# Edges older than this have near-zero path confidence anyway.
graph_startup_load_days = 7
# Evict edges from RAM after 7 days without a new observation.
graph_edge_expiration_days = 7
# Write graph updates in batches rather than on every packet.
graph_write_strategy = batched
# If you don't use the !path command at all, disable graph capture
# entirely to eliminate the background thread and all graph overhead.
# graph_capture_enabled = false
```
These settings do not affect path prediction accuracy: edges older than a few
days carry negligible confidence due to the 48-hour recency half-life used by
the scoring algorithm.
+33 -1
View File
@@ -85,6 +85,37 @@ mqtt2_username = user
mqtt2_password = pass
```
#### Filtering by packet type
You can limit which packet types are uploaded to each broker with `mqttN_upload_packet_types`. Use a comma-separated list of type numbers; if unset or empty, all packet types are uploaded.
```ini
# Only upload text messages and adverts to this broker
mqtt1_upload_packet_types = 2, 4
# Broker 2 gets everything (default)
# mqtt2_upload_packet_types =
```
**Packet type reference:**
| Type | Name | Description |
|------|------------|--------------------|
| 0 | REQ | Request |
| 1 | RESPONSE | Response |
| 2 | TXT_MSG | Text message |
| 3 | ACK | Acknowledgment |
| 4 | ADVERT | Advertisement |
| 5 | GRP_TXT | Group text |
| 6 | GRP_DATA | Group data |
| 7 | ANON_REQ | Anonymous request |
| 8 | PATH | Path |
| 9 | TRACE | Trace |
| 10 | MULTIPART | Multipart |
| 1115| Type11RAW_CUSTOM | Other types |
Packets that are excluded by this filter are still written to the output file (if configured) and still counted; they are only skipped for MQTT upload to that broker. Debug logs will show "Skipping" for those packets.
### Topic Templates
Placeholders:
@@ -170,8 +201,9 @@ Common issues:
### No Packets Being Published
1. **Verify MQTT connection** - Check logs for "Connected to MQTT broker"
2. **Check packet count** - Service logs "Captured packet #N" for each packet
2. **Check packet count** - Service logs "Captured packet #N" (or "Skipping packet #N" when filtered) for each packet
3. **Verify topics** - Ensure topics match broker expectations
4. **Check upload filter** - If `mqttN_upload_packet_types` is set, only those types are uploaded. DEBUG Logs show "packet type X not in [Y, Z]" when a packet is skipped
---
+9 -2
View File
@@ -201,8 +201,15 @@ These settings control how graph edges are stored in the database.
**`graph_startup_load_days`** (days, 0 = load all)
- Load only edges seen in last N days on startup
- `0` = load all edges (recommended for development)
- Default: `0`
- `0` = load all edges (use on servers with ample RAM)
- Default: `14` (set to `0` in `config.ini` to load all)
**`graph_capture_enabled`** (boolean)
- When `false`, no new edge data is collected from packets and the background
batch writer thread is not started — reducing CPU and RAM overhead
- Edges already in the database are still used for path validation
- Set to `false` on devices that don't use the path command
- Default: `true`
## Preset Configurations
+66 -2
View File
@@ -147,9 +147,73 @@ You can keep or remove the old `bot_data.db` file after verifying the viewer wor
## Troubleshooting
### Web viewer not accessible (e.g. Orange Pi / SBC)
If the viewer does not load from another device (e.g. from your phone or PC while the bot runs on an Orange Pi), work through these steps on the Pi.
1. **Confirm config**
- In `config.ini` under `[Web_Viewer]`:
- `enabled = true`
- `auto_start = true` (if you want it to start with the bot)
- `host = 0.0.0.0` (required for access from other devices; `127.0.0.1` is localhost only)
- `port = 8080` (or another port 102465535)
- Restart the bot after changing config.
2. **Check that the viewer process is running**
```bash
# From project root on the Pi
ss -tlnp | grep 8080
# or
netstat -tlnp | grep 8080
```
If nothing listens on your port, the viewer did not start or has exited.
3. **Inspect viewer logs**
- When run by the bot, the viewer writes to:
- `logs/web_viewer_stdout.log`
- `logs/web_viewer_stderr.log`
- Look for Python tracebacks, "Address already in use", or missing dependencies (e.g. Flask, flask-socketio).
- Optional: run the viewer manually to see errors in the terminal:
```bash
cd /path/to/meshcore-bot
python3 modules/web_viewer/app.py --config config.ini --host 0.0.0.0 --port 8080
```
4. **Check integration startup**
- Bot logs may show: `Web viewer integration failed: ...` or `Web viewer integration initialized`.
- If integration failed, the viewer subprocess is never started; fix the error shown (e.g. invalid `host` or `port` in config).
5. **Firewall**
- Many SBC images (e.g. Orange Pi, Armbian minimal) do **not** ship with a firewall; if `curl` to localhost works and `host = 0.0.0.0`, the blocker may be network (WiFi client isolation, different subnet, or router). Check from a device on the same LAN using `http://<PI_IP>:8080`.
- If your system uses **ufw**:
```bash
sudo ufw status
sudo ufw allow 8080/tcp
sudo ufw reload
```
- If `ufw` is not installed (e.g. `sudo: ufw: command not found`), you may have no host firewall—thats common on embedded images. To allow the port with **iptables** (often available when ufw is not):
```bash
sudo iptables -I INPUT -p tcp --dport 8080 -j ACCEPT
```
(Rules may not persist across reboots unless you use a persistence method for your distro.)
- If you prefer ufw, install it (e.g. `sudo apt install ufw`) and use the ufw commands above.
6. **Test from the Pi first**
```bash
curl -s -o /dev/null -w "%{http_code}" http://127.0.0.1:8080/
```
If this returns `200`, the viewer is running and the issue is binding or firewall. If you use `host = 0.0.0.0`, then try from another device: `http://<PI_IP>:8080`.
7. **Standalone run (no bot)**
- To rule out bot integration issues, start the viewer by itself (same config path so it finds the DB):
```bash
python3 modules/web_viewer/app.py --config config.ini --host 0.0.0.0 --port 8080
```
- If `restart_viewer.sh` is used, note it binds to `127.0.0.1` by default; for network access run the command above with `--host 0.0.0.0` or edit the script.
### Flask Not Found
```bash
pip3 install flask
pip3 install flask flask-socketio
```
### Database Not Found
@@ -158,7 +222,7 @@ pip3 install flask
### Port Already in Use
- Change the port in `config.ini` or stop the conflicting service
- Use `lsof -i :5000` to find what's using the port
- Use `ss -tlnp | grep 8080` or `lsof -i :8080` (if available) to find what's using the port
### Permission Denied
```bash
+12
View File
@@ -405,6 +405,18 @@ copy_files_smart "$SCRIPT_DIR" "$INSTALL_DIR" || {
exit 1
}
# Write .version_info at install dir so web viewer and packet_capture show version after install
if command -v git &>/dev/null && [ -d "$SCRIPT_DIR/.git" ]; then
GIT_HASH="$(git -C "$SCRIPT_DIR" rev-parse --short HEAD 2>/dev/null || echo "unknown")"
if VERSION="$(git -C "$SCRIPT_DIR" describe --exact-match HEAD 2>/dev/null)"; then
INSTALLER_VER="$VERSION"
else
INSTALLER_VER="dev-${GIT_HASH}"
fi
printf '%s\n' "{\"installer_version\": \"${INSTALLER_VER}\", \"git_hash\": \"${GIT_HASH}\"}" > "$INSTALL_DIR/.version_info"
print_success "Wrote version info (${INSTALLER_VER}) to $INSTALL_DIR/.version_info"
fi
# If no config.ini in install dir, create it from config.ini.example
if [ ! -f "$INSTALL_DIR/config.ini" ]; then
if [ -f "$INSTALL_DIR/config.ini.example" ]; then
+9 -18
View File
@@ -280,11 +280,7 @@ class RepeaterCommand(BaseCommand):
# Force a complete refresh of contacts from device after purging
self.logger.info("Forcing contact list refresh from device to ensure persistence...")
try:
from meshcore_cli.meshcore_cli import next_cmd
await asyncio.wait_for(
next_cmd(self.bot.meshcore, ["contacts"]),
timeout=30.0
)
await self.bot.meshcore.commands.get_contacts()
self.logger.info("Contact list refreshed from device")
except Exception as e:
self.logger.warning(f"Failed to refresh contact list: {e}")
@@ -314,21 +310,16 @@ class RepeaterCommand(BaseCommand):
# Final verification: Check if contacts were actually removed from device
self.logger.info("Performing final verification of contact removal...")
try:
from meshcore_cli.meshcore_cli import next_cmd
await asyncio.wait_for(
next_cmd(self.bot.meshcore, ["contacts"]),
timeout=30.0
)
await self.bot.meshcore.commands.get_contacts()
# Count remaining repeaters on device
remaining_repeaters = 0
if hasattr(self.bot.meshcore, 'contacts'):
for contact_key, contact_data in self.bot.meshcore.contacts.items():
if self.bot.repeater_manager._is_repeater_device(contact_data):
remaining_repeaters += 1
remaining_repeaters = sum(
1 for contact_data in self.bot.meshcore.contacts.values()
if self.bot.repeater_manager._is_repeater_device(contact_data)
)
self.logger.info(f"Final verification: {remaining_repeaters} repeaters still on device")
except Exception as e:
self.logger.warning(f"Final verification failed: {e}")
+17 -1
View File
@@ -43,6 +43,22 @@ from .transmission_tracker import TransmissionTracker
from .utils import resolve_path
class _DuplicateAwareConfigParser(configparser.ConfigParser):
"""ConfigParser that allows duplicate options (last value wins) and logs ERROR when seen."""
def __init__(self, *args, **kwargs):
kwargs['strict'] = False
super().__init__(*args, **kwargs)
def _handle_option(self, st, line, fpname):
if st.optname in st.options:
logging.getLogger(__name__).error(
"Duplicate option in config file: section [%s], option '%s' (file %s, line %s). Using last value.",
st.sectname, st.optname, fpname, getattr(st, 'lineno', '?'),
)
st.options[st.optname] = st.optvalue
class MeshCoreBot:
"""MeshCore Bot using official meshcore package.
@@ -52,7 +68,7 @@ class MeshCoreBot:
def __init__(self, config_file: str = "config.ini"):
self.config_file = config_file
self.config = configparser.ConfigParser()
self.config = _DuplicateAwareConfigParser()
self.load_config()
# Setup logging
+208 -55
View File
@@ -6,6 +6,8 @@ Persists graph state across bot restarts for development scenarios.
"""
import sqlite3
import sys
import time
import threading
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Set
@@ -24,29 +26,44 @@ class MeshGraph:
self.bot = bot
self.logger = bot.logger
self.db_manager = bot.db_manager
# Capture/validation feature flags
# graph_capture_enabled: controls whether new edge data is collected from packets
# When False, no new edges are added and the batch writer thread is not started.
self.capture_enabled = bot.config.getboolean('Path_Command', 'graph_capture_enabled', fallback=True)
# In-memory graph storage: {(from_prefix, to_prefix): edge_data}
self.edges: Dict[Tuple[str, str], Dict] = {}
# Adjacency indexes for O(1) neighbour lookups (derived from self.edges)
self._outgoing_index: Dict[str, Set[str]] = defaultdict(set) # from_prefix -> set of to_prefixes
self._incoming_index: Dict[str, Set[str]] = defaultdict(set) # to_prefix -> set of from_prefixes
# Per-edge last-notification timestamps for web viewer throttling (unix float)
self._notification_timestamps: Dict[Tuple[str, str], float] = {}
# Track pending updates for batched writes
self.pending_updates: Set[Tuple[str, str]] = set()
self.pending_lock = threading.Lock()
# Write strategy configuration
self.write_strategy = bot.config.get('Path_Command', 'graph_write_strategy', fallback='hybrid')
self.batch_interval = bot.config.getint('Path_Command', 'graph_batch_interval_seconds', fallback=30)
self.batch_max_pending = bot.config.getint('Path_Command', 'graph_batch_max_pending', fallback=100)
self.startup_load_days = bot.config.getint('Path_Command', 'graph_startup_load_days', fallback=0)
# Default 14 days: edges older than this carry near-zero recency confidence anyway.
# Set to 0 in config.ini to load all historical edges (e.g. on servers with ample RAM).
self.startup_load_days = bot.config.getint('Path_Command', 'graph_startup_load_days', fallback=14)
self.edge_expiration_days = bot.config.getint('Path_Command', 'graph_edge_expiration_days', fallback=7)
# Background task for batched writes
self._batch_task = None
self._shutdown_event = threading.Event()
# Load graph from database on startup
self._load_from_database()
# Start background batch writer if needed
if self.write_strategy in ('batched', 'hybrid'):
# Start background batch writer only when capture is active
if self.capture_enabled and self.write_strategy in ('batched', 'hybrid'):
self._start_batch_writer()
def _load_from_database(self):
@@ -58,42 +75,70 @@ class MeshGraph:
geographic_distance
FROM mesh_connections
'''
# Apply date filter if configured
# Build WHERE clause combining startup_load_days and edge_expiration_days.
# startup_load_days: explicit cap on how far back to load (0 = no cap).
# edge_expiration_days: always applied — never load edges we would immediately
# evict as expired (this bounds memory even when startup_load_days=0).
where_parts = []
if self.startup_load_days > 0:
cutoff_date = datetime.now() - timedelta(days=self.startup_load_days)
query += f" WHERE last_seen >= '{cutoff_date.isoformat()}'"
where_parts.append(f"last_seen >= '{cutoff_date.isoformat()}'")
if self.edge_expiration_days > 0:
expiry_date = datetime.now() - timedelta(days=self.edge_expiration_days)
where_parts.append(f"last_seen >= '{expiry_date.isoformat()}'")
if where_parts:
# Use the most restrictive (most recent) cutoff
query += " WHERE " + " AND ".join(where_parts)
query += " ORDER BY last_seen DESC"
results = self.db_manager.execute_query(query)
edge_count = 0
for row in results:
from_prefix = row['from_prefix']
to_prefix = row['to_prefix']
edge_key = (from_prefix, to_prefix)
# Intern public key strings so identical keys across many edges share
# a single string object in memory rather than duplicating bytes.
from_pk = row.get('from_public_key')
to_pk = row.get('to_public_key')
if from_pk:
from_pk = sys.intern(from_pk)
if to_pk:
to_pk = sys.intern(to_pk)
self.edges[edge_key] = {
'from_prefix': from_prefix,
'to_prefix': to_prefix,
'from_public_key': row.get('from_public_key'),
'to_public_key': row.get('to_public_key'),
'from_public_key': from_pk,
'to_public_key': to_pk,
'observation_count': row.get('observation_count', 1),
'first_seen': row.get('first_seen'),
'last_seen': row.get('last_seen'),
'avg_hop_position': row.get('avg_hop_position'),
'geographic_distance': row.get('geographic_distance')
}
# Maintain adjacency indexes
self._outgoing_index[from_prefix].add(to_prefix)
self._incoming_index[to_prefix].add(from_prefix)
edge_count += 1
self.logger.info(f"Loaded {edge_count} graph edges from database")
# Log statistics
if edge_count > 0:
total_observations = sum(e['observation_count'] for e in self.edges.values())
self.logger.info(f"Graph statistics: {edge_count} edges, {total_observations} total observations")
# Belt-and-suspenders: prune any edges that slipped through the SQL filter
# (e.g. timezone edge cases or edges loaded before expiration_days was set)
self.prune_expired_edges()
except Exception as e:
self.logger.warning(f"Error loading graph from database: {e}")
# Continue with empty graph
@@ -115,20 +160,30 @@ class MeshGraph:
"""
if not from_prefix or not to_prefix:
return
# Respect the capture kill-switch — allow reads but block writes
if not self.capture_enabled:
return
# Normalize prefixes to lowercase
from_prefix = from_prefix.lower()[:2]
to_prefix = to_prefix.lower()[:2]
# Intern public key strings so repeated identical keys share one object in RAM
if from_public_key:
from_public_key = sys.intern(from_public_key)
if to_public_key:
to_public_key = sys.intern(to_public_key)
edge_key = (from_prefix, to_prefix)
now = datetime.now()
# Update or create edge
if edge_key in self.edges:
edge = self.edges[edge_key]
edge['observation_count'] += 1
edge['last_seen'] = now
# Update average hop position
if hop_position is not None:
current_avg = edge.get('avg_hop_position')
@@ -139,21 +194,21 @@ class MeshGraph:
else:
# First time setting hop position
edge['avg_hop_position'] = hop_position
# Update public keys if provided (always update if we have a better key)
# This allows us to fill in missing keys on existing edges
if from_public_key:
edge['from_public_key'] = from_public_key
if to_public_key:
edge['to_public_key'] = to_public_key
# Update geographic distance if provided
if geographic_distance is not None:
edge['geographic_distance'] = geographic_distance
is_new_edge = False
else:
# New edge
# New edge — also update adjacency indexes
self.edges[edge_key] = {
'from_prefix': from_prefix,
'to_prefix': to_prefix,
@@ -165,6 +220,8 @@ class MeshGraph:
'avg_hop_position': hop_position if hop_position is not None else None,
'geographic_distance': geographic_distance
}
self._outgoing_index[from_prefix].add(to_prefix)
self._incoming_index[to_prefix].add(from_prefix)
is_new_edge = True
# Persist according to write strategy
@@ -193,18 +250,31 @@ class MeshGraph:
self._notify_web_viewer_edge(edge_key, is_new_edge)
def _notify_web_viewer_edge(self, edge_key: Tuple[str, str], is_new: bool):
"""Notify web viewer of edge update via bot integration"""
"""Notify web viewer of edge update via bot integration.
New edges always trigger an immediate notification. Updates to existing
edges are throttled to at most once every 10 seconds to reduce HTTP
traffic on busy meshes.
"""
try:
if not hasattr(self.bot, 'web_viewer_integration') or not self.bot.web_viewer_integration:
return
if not hasattr(self.bot.web_viewer_integration, 'bot_integration'):
return
edge = self.edges.get(edge_key)
if not edge:
return
# Throttle repeated updates for the same edge (new edges always notify)
now_ts = time.time()
if not is_new:
last_notified = self._notification_timestamps.get(edge_key, 0.0)
if (now_ts - last_notified) < 10.0:
return # Skip — notified recently enough
self._notification_timestamps[edge_key] = now_ts
# Prepare edge data for web viewer
edge_data = {
'from_prefix': edge['from_prefix'],
@@ -218,7 +288,7 @@ class MeshGraph:
'geographic_distance': edge.get('geographic_distance'),
'is_new': is_new
}
# Send update asynchronously
self.bot.web_viewer_integration.bot_integration.send_mesh_edge_update(edge_data)
except Exception as e:
@@ -434,39 +504,53 @@ class MeshGraph:
is_new: bool,
conn: Optional[sqlite3.Connection] = None,
location_cache: Optional[Dict[str, Tuple[float, float]]] = None,
skip_distance_recalc: bool = False,
):
"""Write a single edge to the database.
Args:
edge_key: (from_prefix, to_prefix) tuple.
is_new: True if this is a new edge, False if updating existing.
conn: Optional existing DB connection for batch operations (caller commits).
location_cache: Optional cache for location lookups within a flush.
skip_distance_recalc: If True, skip distance recalculation (used by
_flush_pending_updates_sync which already recalculates before calling here).
"""
if edge_key not in self.edges:
return
edge = self.edges[edge_key]
# Recalculate distance using full public keys if available (more accurate)
# This fixes issues where prefix collisions cause wrong locations to be used
if edge.get('from_public_key') or edge.get('to_public_key'):
# Recalculate distance using full public keys if available (more accurate).
# Skipped when called from the batch flush loop, which already recalculated.
if not skip_distance_recalc and (edge.get('from_public_key') or edge.get('to_public_key')):
recalculated_distance = self._recalculate_distance_if_needed(
edge, conn=conn, location_cache=location_cache
)
if recalculated_distance is not None:
edge['geographic_distance'] = recalculated_distance
self.logger.debug(f"Mesh graph: Recalculated distance for {edge_key} using public keys: {recalculated_distance:.1f} km")
try:
if is_new:
# Insert new edge
# Upsert new edge.
# Use INSERT ... ON CONFLICT DO UPDATE so that if the row already exists
# in the database (e.g. it was filtered out of the in-memory graph at
# startup by startup_load_days / edge_expiration_days, or written by a
# concurrent process), we merge rather than fail with UNIQUE constraint.
query = '''
INSERT INTO mesh_connections
INSERT INTO mesh_connections
(from_prefix, to_prefix, from_public_key, to_public_key,
observation_count, first_seen, last_seen, avg_hop_position,
geographic_distance)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(from_prefix, to_prefix) DO UPDATE SET
observation_count = MAX(observation_count, excluded.observation_count),
last_seen = MAX(last_seen, excluded.last_seen),
avg_hop_position = excluded.avg_hop_position,
geographic_distance = COALESCE(excluded.geographic_distance, geographic_distance),
from_public_key = COALESCE(excluded.from_public_key, from_public_key),
to_public_key = COALESCE(excluded.to_public_key, to_public_key)
'''
params = (
edge['from_prefix'],
@@ -480,10 +564,10 @@ class MeshGraph:
edge.get('geographic_distance')
)
else:
# Update existing edge - recalculate distance if we now have public keys
# Only update distance if we have at least one public key and current distance seems wrong
# Update existing edge recalculate distance if we now have public keys,
# but only when not already done by the caller (skip_distance_recalc=False).
current_distance = edge.get('geographic_distance')
if (edge.get('from_public_key') or edge.get('to_public_key')) and current_distance:
if not skip_distance_recalc and (edge.get('from_public_key') or edge.get('to_public_key')) and current_distance:
recalculated = self._recalculate_distance_if_needed(
edge, conn=conn, location_cache=location_cache
)
@@ -581,6 +665,51 @@ class MeshGraph:
self.logger.debug(f"Error building update params for {edge_key}: {e}")
return None
def prune_expired_edges(self) -> int:
"""Remove edges from the in-memory graph that have exceeded graph_edge_expiration_days.
Only evicts from RAM — the database rows are kept so that historical data is
preserved and can be reloaded if the expiration window is later widened.
Returns:
int: Number of edges evicted.
"""
if self.edge_expiration_days <= 0:
return 0
cutoff = datetime.now() - timedelta(days=self.edge_expiration_days)
expired_keys = []
for edge_key, edge in self.edges.items():
last_seen = edge.get('last_seen')
if last_seen is None:
continue
if isinstance(last_seen, str):
try:
last_seen = datetime.fromisoformat(last_seen.replace('Z', '+00:00'))
except ValueError:
continue
if last_seen < cutoff:
expired_keys.append(edge_key)
for edge_key in expired_keys:
from_prefix, to_prefix = edge_key
del self.edges[edge_key]
# Clean up adjacency indexes
if from_prefix in self._outgoing_index:
self._outgoing_index[from_prefix].discard(to_prefix)
if not self._outgoing_index[from_prefix]:
del self._outgoing_index[from_prefix]
if to_prefix in self._incoming_index:
self._incoming_index[to_prefix].discard(from_prefix)
if not self._incoming_index[to_prefix]:
del self._incoming_index[to_prefix]
# Drop stale notification timestamp if present
self._notification_timestamps.pop(edge_key, None)
if expired_keys:
self.logger.debug(f"Pruned {len(expired_keys)} expired graph edges (older than {self.edge_expiration_days} days)")
return len(expired_keys)
def _start_batch_writer(self):
"""Start background task for batched writes."""
def batch_writer_loop():
@@ -589,7 +718,9 @@ class MeshGraph:
if not self._shutdown_event.is_set():
# Flush synchronously (database operations are synchronous)
self._flush_pending_updates_sync()
# Periodically evict expired edges from RAM
self.prune_expired_edges()
import threading
batch_thread = threading.Thread(target=batch_writer_loop, daemon=True)
batch_thread.start()
@@ -630,7 +761,9 @@ class MeshGraph:
(edge_key[0], edge_key[1]),
)
is_new = cursor.fetchone() is None
self._write_edge_to_db(edge_key, is_new, conn=conn, location_cache=location_cache)
# Distance was already recalculated above — tell _write_edge_to_db to skip it
self._write_edge_to_db(edge_key, is_new, conn=conn, location_cache=location_cache,
skip_distance_recalc=True)
if conn:
conn.commit()
except Exception as e:
@@ -684,27 +817,47 @@ class MeshGraph:
def get_outgoing_edges(self, prefix: str) -> List[Dict]:
"""Get all edges originating from a node.
Uses the adjacency index for O(1) lookup instead of a full scan.
Args:
prefix: Node prefix.
Returns:
List of edge dictionaries.
"""
prefix = prefix.lower()[:2]
return [edge for (f, t), edge in self.edges.items() if f == prefix]
to_prefixes = self._outgoing_index.get(prefix)
if not to_prefixes:
return []
result = []
for to_prefix in to_prefixes:
edge = self.edges.get((prefix, to_prefix))
if edge is not None:
result.append(edge)
return result
def get_incoming_edges(self, prefix: str) -> List[Dict]:
"""Get all edges ending at a node.
Uses the adjacency index for O(1) lookup instead of a full scan.
Args:
prefix: Node prefix.
Returns:
List of edge dictionaries.
"""
prefix = prefix.lower()[:2]
return [edge for (f, t), edge in self.edges.items() if t == prefix]
from_prefixes = self._incoming_index.get(prefix)
if not from_prefixes:
return []
result = []
for from_prefix in from_prefixes:
edge = self.edges.get((from_prefix, prefix))
if edge is not None:
result.append(edge)
return result
def validate_path_segment(self, from_prefix: str, to_prefix: str,
min_observations: int = 1,
+14 -7
View File
@@ -534,9 +534,12 @@ class MessageHandler:
advert_data['out_path'] = out_path
advert_data['out_path_len'] = out_path_len
# Update mesh graph with edges from the advert path
# Create edge from advertising device to first hop in path
if out_path and out_path_len > 0 and hasattr(self.bot, 'mesh_graph') and self.bot.mesh_graph:
# Update mesh graph with edges from the advert path (one edge per hop).
# This can trigger many send_mesh_edge_update() calls in quick succession;
# if the web viewer is down, that produces a wave of connection-refused logs.
if (out_path and out_path_len > 0
and hasattr(self.bot, 'mesh_graph') and self.bot.mesh_graph
and self.bot.mesh_graph.capture_enabled):
self._update_mesh_graph_from_advert(advert_data, out_path, out_path_len, packet_info)
# Store complete path in observed_paths table
@@ -1605,7 +1608,8 @@ class MessageHandler:
# Update mesh graph with trace path - bot is the destination, so we can confirm these edges
# Since the bot received this trace packet, it's the destination node
self._update_mesh_graph_from_trace(path_hashes, packet_info)
if hasattr(self.bot, 'mesh_graph') and self.bot.mesh_graph and self.bot.mesh_graph.capture_enabled:
self._update_mesh_graph_from_trace(path_hashes, packet_info)
else:
path_string = "Direct" if hops == 0 else f"Unknown routing ({hops} hops)"
self.logger.info(f"🎯 EXTRACTED PATH FROM TRACE PACKET: {path_string}")
@@ -1619,7 +1623,8 @@ class MessageHandler:
path_string = ','.join(path_nodes)
self.logger.info(f"🎯 EXTRACTED PATH FROM PACKET: {path_string} ({hops} hops)")
# Update mesh graph with path edges
self._update_mesh_graph(path_nodes, packet_info)
if hasattr(self.bot, 'mesh_graph') and self.bot.mesh_graph and self.bot.mesh_graph.capture_enabled:
self._update_mesh_graph(path_nodes, packet_info)
else:
# Method 2: Try path_hex field
path_hex = packet_info.get('path_hex', '')
@@ -1629,7 +1634,8 @@ class MessageHandler:
path_string = ','.join(path_nodes)
self.logger.info(f"🎯 EXTRACTED PATH FROM PACKET HEX: {path_string} ({hops} hops)")
# Update mesh graph with path edges
self._update_mesh_graph(path_nodes, packet_info)
if hasattr(self.bot, 'mesh_graph') and self.bot.mesh_graph and self.bot.mesh_graph.capture_enabled:
self._update_mesh_graph(path_nodes, packet_info)
else:
# Method 3: Try path_info.path field
path_info = packet_info.get('path_info', {})
@@ -1638,7 +1644,8 @@ class MessageHandler:
path_string = ','.join(path_nodes)
self.logger.info(f"🎯 EXTRACTED PATH FROM PATH_INFO: {path_string} ({hops} hops)")
# Update mesh graph with path edges
self._update_mesh_graph(path_nodes, packet_info)
if hasattr(self.bot, 'mesh_graph') and self.bot.mesh_graph and self.bot.mesh_graph.capture_enabled:
self._update_mesh_graph(path_nodes, packet_info)
else:
# No path found - this is truly unknown
path_string = "Direct" if hops == 0 else "Unknown routing"
File diff suppressed because it is too large Load Diff
@@ -231,6 +231,14 @@ class PacketCaptureService(BaseServicePlugin):
broker_num += 1
continue
# Parse upload_packet_types: comma-separated list (e.g. "2,4"); empty/unset = upload all
upload_types_raw = config.get('PacketCapture', f'mqtt{broker_num}_upload_packet_types', fallback='').strip()
upload_packet_types = None
if upload_types_raw:
upload_packet_types = frozenset(t.strip() for t in upload_types_raw.split(',') if t.strip())
if not upload_packet_types:
upload_packet_types = None
broker = {
'enabled': True,
'host': config.get('PacketCapture', server_key, fallback='localhost'),
@@ -245,7 +253,8 @@ class PacketCaptureService(BaseServicePlugin):
'transport': config.get('PacketCapture', f'mqtt{broker_num}_transport', fallback='tcp').lower(),
'use_tls': config.getboolean('PacketCapture', f'mqtt{broker_num}_use_tls', fallback=False),
'websocket_path': config.get('PacketCapture', f'mqtt{broker_num}_websocket_path', fallback='/mqtt'),
'client_id': config.get('PacketCapture', f'mqtt{broker_num}_client_id', fallback=None)
'client_id': config.get('PacketCapture', f'mqtt{broker_num}_client_id', fallback=None),
'upload_packet_types': upload_packet_types,
}
# Set default topic_prefix if not set
@@ -806,7 +815,8 @@ class PacketCaptureService(BaseServicePlugin):
publish_metrics = await self.publish_packet_mqtt(formatted_packet)
# Log DEBUG level for each packet (verbose; use INFO only for service lifecycle)
self.logger.debug(f"📦 Captured packet #{self.packet_count}: {formatted_packet['route']} type {formatted_packet['packet_type']}, {formatted_packet['len']} bytes, SNR: {formatted_packet['SNR']}, RSSI: {formatted_packet['RSSI']}, hash: {formatted_packet['hash']} (MQTT: {publish_metrics['succeeded']}/{publish_metrics['attempted']})")
action = "Skipping" if publish_metrics.get("skipped_by_filter") else "Captured"
self.logger.debug(f"📦 {action} packet #{self.packet_count}: {formatted_packet['route']} type {formatted_packet['packet_type']}, {formatted_packet['len']} bytes, SNR: {formatted_packet['SNR']}, RSSI: {formatted_packet['RSSI']}, hash: {formatted_packet['hash']} (MQTT: {publish_metrics['succeeded']}/{publish_metrics['attempted']})")
# Output full packet data structure in debug mode only (matches original script)
if self.debug:
@@ -1277,20 +1287,21 @@ class PacketCaptureService(BaseServicePlugin):
return topic
async def publish_packet_mqtt(self, packet_info: Dict[str, Any]) -> Dict[str, int]:
"""Publish packet to MQTT - returns metrics dict with 'attempted' and 'succeeded' counts.
async def publish_packet_mqtt(self, packet_info: Dict[str, Any]) -> Dict[str, Any]:
"""Publish packet to MQTT - returns metrics dict with attempted/succeeded/skipped_by_filter.
Args:
packet_info: Formatted packet dictionary.
Returns:
Dict[str, int]: Dictionary with 'attempted' and 'succeeded' counts.
Dict with 'attempted', 'succeeded' counts and 'skipped_by_filter' (True when
packet type was excluded by mqttN_upload_packet_types for all connected brokers).
"""
# Always log when function is called (helps diagnose if it's not being invoked)
self.logger.debug(f"publish_packet_mqtt called (packet {self.packet_count}, {len(self.mqtt_clients)} clients)")
# Initialize metrics
metrics = {"attempted": 0, "succeeded": 0}
# Initialize metrics (skipped_by_filter: True when packet type excluded by upload_packet_types)
metrics = {"attempted": 0, "succeeded": 0, "skipped_by_filter": False}
# Check per-broker connection status (more accurate than global flag)
# Don't use early return - let the loop check each broker individually
@@ -1309,7 +1320,16 @@ class PacketCaptureService(BaseServicePlugin):
try:
client = mqtt_client_info['client']
config = mqtt_client_info['config']
# Per-broker packet type filter: if set, only upload listed types (e.g. 2,4 = TXT_MSG, ADVERT)
upload_types = config.get('upload_packet_types')
if upload_types is not None and packet_info.get('packet_type', '') not in upload_types:
metrics["skipped_by_filter"] = True
self.logger.debug(
f"Skipping MQTT broker {config.get('host', 'unknown')} (packet type {packet_info.get('packet_type')} not in {sorted(upload_types)})"
)
continue
# Determine topic
topic = None
if config.get('topic_packets'):
+64 -2
View File
@@ -9,6 +9,7 @@ import json
import time
import configparser
import logging
import subprocess
import threading
from datetime import datetime, timedelta, date
from flask import Flask, render_template, jsonify, request, send_from_directory, make_response
@@ -85,6 +86,9 @@ class BotDataViewer:
use_db = bot_db
self.db_path = str(resolve_path(use_db, self.bot_root))
# Version info for footer (tag or branch/commit/date); computed once at startup
self._version_info = self._get_version_info()
# Setup template context processor for global template variables
self._setup_template_context()
@@ -148,8 +152,61 @@ class BotDataViewer:
config.read(config_path)
return config
def _get_version_info(self) -> Dict[str, Optional[str]]:
"""Get version info for footer: tag if on a tag, else branch, commit hash and date.
Checks MESHCORE_BOT_VERSION env (Docker/build), then .version_info, then git. Never raises."""
out = {"tag": None, "branch": None, "commit": None, "date": None}
# Docker / CI: version set at build time (e.g. ARG + ENV in Dockerfile)
env_version = os.environ.get("MESHCORE_BOT_VERSION", "").strip()
if env_version:
out["tag"] = env_version if env_version.startswith("v") else f"v{env_version}"
return out
version_file = self.bot_root / ".version_info"
try:
if version_file.is_file():
with open(version_file, "r") as f:
data = json.load(f)
# Installer/tag installs write installer_version (often the tag name)
tag = data.get("installer_version") or data.get("tag")
if tag:
out["tag"] = tag if tag.startswith("v") else f"v{tag}"
return out
except (OSError, json.JSONDecodeError, KeyError):
pass
try:
def run(cmd: List[str]) -> Optional[str]:
args = ["git", "-C", str(self.bot_root)] + cmd
result = subprocess.run(
args, capture_output=True, text=True, timeout=5
)
if result.returncode != 0:
return None
return (result.stdout or "").strip() or None
# Check if HEAD is a tag
tag = run(["describe", "--exact-match", "HEAD"])
if tag:
out["tag"] = tag if tag.startswith("v") else f"v{tag}"
return out
branch = run(["rev-parse", "--abbrev-ref", "HEAD"])
commit = run(["rev-parse", "--short", "HEAD"])
date_raw = run(["show", "-s", "--format=%ci", "HEAD"])
out["branch"] = branch or None
out["commit"] = commit or None
if date_raw:
try:
# %ci is "YYYY-MM-DD HH:MM:SS +tz"; take date part only
out["date"] = date_raw.split()[0]
except IndexError:
out["date"] = date_raw
return out
except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError, OSError):
return out
def _setup_template_context(self):
"""Setup template context processor to inject global variables"""
version_info = self._version_info
@self.app.context_processor
def inject_template_vars():
"""Inject variables available to all templates. Never raises so templates always render."""
@@ -166,10 +223,15 @@ class BotDataViewer:
bot_name = (self.config.get('Bot', 'bot_name', fallback='MeshCore Bot') or '').strip() or 'MeshCore Bot'
except (configparser.NoSectionError, configparser.NoOptionError):
bot_name = 'MeshCore Bot'
return dict(greeter_enabled=greeter_enabled, feed_manager_enabled=feed_manager_enabled, bot_name=bot_name)
return dict(
greeter_enabled=greeter_enabled,
feed_manager_enabled=feed_manager_enabled,
bot_name=bot_name,
version_info=version_info,
)
except Exception as e:
self.logger.exception("Template context processor failed: %s", e)
return dict(greeter_enabled=False, feed_manager_enabled=False, bot_name='MeshCore Bot')
return dict(greeter_enabled=False, feed_manager_enabled=False, bot_name='MeshCore Bot', version_info=version_info)
def _get_db_path(self):
"""Get the database path, falling back to [Bot] db_path if [Web_Viewer] db_path is unset"""
+43 -15
View File
@@ -17,10 +17,15 @@ from ..utils import resolve_path
class BotIntegration:
"""Simple bot integration for web viewer compatibility"""
# After this many consecutive connection failures, stop sending until cooldown expires
CIRCUIT_BREAKER_THRESHOLD = 3
CIRCUIT_BREAKER_COOLDOWN_SEC = 60
def __init__(self, bot):
self.bot = bot
self.circuit_breaker_open = False
self.circuit_breaker_failures = 0
self.circuit_breaker_last_failure_time = 0.0
self.is_shutting_down = False
# Initialize HTTP session with connection pooling for efficient reuse
self._init_http_session()
@@ -36,11 +41,10 @@ class BotIntegration:
import urllib3
import logging
# Suppress urllib3 connection pool debug messages
# "Resetting dropped connection" is expected behavior when connections are idle
# and the connection pool is working correctly
# Suppress urllib3 connection pool messages when web viewer is unreachable
# Connection refused / Retrying WARNINGs would flood logs during routing bursts
urllib3_logger = logging.getLogger('urllib3.connectionpool')
urllib3_logger.setLevel(logging.INFO) # Suppress DEBUG messages
urllib3_logger.setLevel(logging.ERROR)
# Also disable other urllib3 warnings
urllib3.disable_warnings(urllib3.exceptions.NotOpenSSLWarning)
@@ -80,6 +84,30 @@ class BotIntegration:
"""Reset the circuit breaker"""
self.circuit_breaker_open = False
self.circuit_breaker_failures = 0
def _should_skip_web_viewer_send(self):
"""Return True if we should skip sending (circuit open and within cooldown)."""
if not self.circuit_breaker_open:
return False
if (time.time() - self.circuit_breaker_last_failure_time) >= self.CIRCUIT_BREAKER_COOLDOWN_SEC:
self.reset_circuit_breaker()
return False
return True
def _record_web_viewer_result(self, success):
"""Update circuit breaker state after a send attempt."""
if success:
self.reset_circuit_breaker()
else:
self.circuit_breaker_failures += 1
self.circuit_breaker_last_failure_time = time.time()
if self.circuit_breaker_failures >= self.CIRCUIT_BREAKER_THRESHOLD:
self.circuit_breaker_open = True
self.bot.logger.debug(
"Web viewer unreachable after %d failures; circuit open for %ds",
self.circuit_breaker_failures,
self.CIRCUIT_BREAKER_COOLDOWN_SEC,
)
def _get_web_viewer_db_path(self):
"""Return resolved database path for web viewer. Uses [Bot] db_path when [Web_Viewer] db_path is unset."""
@@ -325,6 +353,8 @@ class BotIntegration:
def send_mesh_edge_update(self, edge_data):
"""Send mesh edge update to web viewer via HTTP API"""
try:
if self._should_skip_web_viewer_send():
return
# Get web viewer URL from config
host = self.bot.config.get('Web_Viewer', 'host', fallback='127.0.0.1')
port = self.bot.config.getint('Web_Viewer', 'port', fallback=8080)
@@ -338,28 +368,27 @@ class BotIntegration:
# Use session with connection pooling if available, otherwise fallback to requests.post
if self.http_session:
try:
# Use a slightly longer timeout to allow connection reuse
self.http_session.post(url, json=payload, timeout=1.0)
self._record_web_viewer_result(True)
except Exception:
# Silently fail - web viewer might not be running
pass
self._record_web_viewer_result(False)
else:
# Fallback if session not initialized
import requests
try:
requests.post(url, json=payload, timeout=1.0)
self._record_web_viewer_result(True)
except Exception:
pass
self._record_web_viewer_result(False)
except Exception as e:
self.bot.logger.debug(f"Error sending mesh edge update to web viewer: {e}")
def send_mesh_node_update(self, node_data):
"""Send mesh node update to web viewer via HTTP API"""
try:
if self._should_skip_web_viewer_send():
return
import requests
import json
# Get web viewer URL from config
host = self.bot.config.get('Web_Viewer', 'host', fallback='127.0.0.1')
port = self.bot.config.getint('Web_Viewer', 'port', fallback=8080)
url = f"http://{host}:{port}/api/stream_data"
@@ -369,12 +398,11 @@ class BotIntegration:
'data': node_data
}
# Send asynchronously (don't block)
try:
requests.post(url, json=payload, timeout=0.5)
self._record_web_viewer_result(True)
except Exception:
# Silently fail - web viewer might not be running
pass
self._record_web_viewer_result(False)
except Exception as e:
self.bot.logger.debug(f"Error sending mesh node update to web viewer: {e}")
+9
View File
@@ -427,6 +427,15 @@
<!-- Footer links (bottom of page, visible when scrolled down) -->
<footer class="text-center py-2" style="font-size: 0.7rem;">
<a href="https://github.com/agessaman/meshcore-bot" target="_blank" rel="noopener noreferrer" style="color: var(--text-muted, #6c757d); text-decoration: none;">meshcore-bot</a>
{% if version_info.tag %}
<span class="mx-1" style="color: var(--text-muted, #6c757d);"> {{ version_info.tag }}</span>
{% elif version_info.branch or version_info.commit or version_info.date %}
<span class="mx-1" style="color: var(--text-muted, #6c757d);">
{% if version_info.branch %}{{ version_info.branch }}{% endif %}
{% if version_info.commit %}{% if version_info.branch %} · {% endif %}<code class="text-muted" style="font-size: inherit;">{{ version_info.commit }}</code>{% endif %}
{% if version_info.date %}{% if version_info.branch or version_info.commit %} · {% endif %}{{ version_info.date }}{% endif %}
</span>
{% endif %}
<span class="mx-1" style="color: var(--text-muted, #6c757d);"> for </span>
<a href="https://meshcore.co.uk/index.html" target="_blank" rel="noopener noreferrer" style="color: var(--text-muted, #6c757d); text-decoration: none;">MeshCore</a>
</footer>
+1
View File
@@ -124,6 +124,7 @@ def test_config():
config.set('Path_Command', 'graph_batch_max_pending', '100')
config.set('Path_Command', 'graph_startup_load_days', '0') # Don't load old data in tests
config.set('Path_Command', 'graph_edge_expiration_days', '7')
config.set('Path_Command', 'graph_capture_enabled', 'true')
config.set('Path_Command', 'graph_use_bidirectional', 'true')
config.set('Path_Command', 'graph_use_hop_position', 'true')
config.set('Path_Command', 'graph_multi_hop_enabled', 'true')
+338
View File
@@ -0,0 +1,338 @@
#!/usr/bin/env python3
"""
Unit tests for MeshGraph performance optimizations.
Covers the optimizations added for low-memory devices (Raspberry Pi Zero 2 W):
- Adjacency indexes (_outgoing_index / _incoming_index) for O(1) lookups
- sys.intern() public-key string deduplication
- prune_expired_edges() and edge expiration SQL filter
- Web-viewer notification throttle (_notification_timestamps)
- capture_enabled flag (graph_capture_enabled config setting)
"""
import time
import sqlite3
import pytest
from datetime import datetime, timedelta
from unittest.mock import MagicMock
from modules.mesh_graph import MeshGraph
# ---------------------------------------------------------------------------
# Helper
# ---------------------------------------------------------------------------
def _make_key(prefix: str) -> str:
"""Generate a deterministic 64-char hex public key from a 2-char prefix."""
return (prefix.lower() * 32)[:64]
# ---------------------------------------------------------------------------
# 1. Adjacency Indexes
# ---------------------------------------------------------------------------
@pytest.mark.unit
class TestAdjacencyIndexes:
"""Verify that _outgoing_index and _incoming_index are maintained correctly."""
def test_index_populated_on_add_edge(self, mesh_graph):
"""Adding an edge must update both adjacency indexes."""
mesh_graph.add_edge('ab', 'cd')
assert 'cd' in mesh_graph._outgoing_index['ab']
assert 'ab' in mesh_graph._incoming_index['cd']
def test_index_not_duplicated_on_update(self, mesh_graph):
"""Updating an existing edge must not add duplicate entries to the sets."""
mesh_graph.add_edge('ab', 'cd')
mesh_graph.add_edge('ab', 'cd') # second call is an update
assert len(mesh_graph._outgoing_index['ab']) == 1
assert len(mesh_graph._incoming_index['cd']) == 1
def test_get_outgoing_edges_uses_index(self, mesh_graph):
"""get_outgoing_edges() must return all edges from a node via the index."""
mesh_graph.add_edge('ab', 'cd')
mesh_graph.add_edge('ab', 'ef')
result = mesh_graph.get_outgoing_edges('ab')
assert len(result) == 2
to_prefixes = {e['to_prefix'] for e in result}
assert to_prefixes == {'cd', 'ef'}
def test_get_incoming_edges_uses_index(self, mesh_graph):
"""get_incoming_edges() must return all edges to a node via the index."""
mesh_graph.add_edge('ab', 'ef')
mesh_graph.add_edge('cd', 'ef')
result = mesh_graph.get_incoming_edges('ef')
assert len(result) == 2
from_prefixes = {e['from_prefix'] for e in result}
assert from_prefixes == {'ab', 'cd'}
def test_get_outgoing_edges_empty_for_unknown_prefix(self, mesh_graph):
"""get_outgoing_edges() for an unknown prefix must return [] without raising."""
result = mesh_graph.get_outgoing_edges('zz')
assert result == []
def test_get_incoming_edges_empty_for_unknown_prefix(self, mesh_graph):
"""get_incoming_edges() for an unknown prefix must return [] without raising."""
result = mesh_graph.get_incoming_edges('zz')
assert result == []
def test_index_consistent_with_edges_dict(self, mesh_graph):
"""Every (from, to) pair in self.edges must be reflected in both indexes."""
mesh_graph.add_edge('ab', 'cd')
mesh_graph.add_edge('ab', 'ef')
mesh_graph.add_edge('cd', 'ef')
for (from_p, to_p) in mesh_graph.edges:
assert to_p in mesh_graph._outgoing_index[from_p], \
f"Missing {to_p} in _outgoing_index[{from_p}]"
assert from_p in mesh_graph._incoming_index[to_p], \
f"Missing {from_p} in _incoming_index[{to_p}]"
# ---------------------------------------------------------------------------
# 2. Public Key Interning
# ---------------------------------------------------------------------------
@pytest.mark.unit
class TestPublicKeyInterning:
"""Verify that sys.intern() causes identical public-key strings to share identity."""
def test_same_key_shared_across_edges(self, mesh_graph):
"""Identical public keys stored on different edges must be the same object."""
shared_key = _make_key('ab')
mesh_graph.add_edge('ab', 'cd', from_public_key=shared_key)
mesh_graph.add_edge('ab', 'ef', from_public_key=shared_key)
edge1 = mesh_graph.get_edge('ab', 'cd')
edge2 = mesh_graph.get_edge('ab', 'ef')
# 'is' checks object identity — only true if sys.intern() is working
assert edge1['from_public_key'] is edge2['from_public_key']
def test_interning_does_not_alter_value(self, mesh_graph):
"""sys.intern() must not change the string's value."""
key = _make_key('ab')
mesh_graph.add_edge('ab', 'cd', from_public_key=key)
edge = mesh_graph.get_edge('ab', 'cd')
assert edge['from_public_key'] == key
# ---------------------------------------------------------------------------
# 3. Edge Expiration / prune_expired_edges()
# ---------------------------------------------------------------------------
@pytest.mark.unit
class TestEdgeExpiration:
"""Verify that prune_expired_edges() correctly evicts stale edges from RAM."""
def _add_expired_edge(self, mesh_graph, from_p, to_p, days_old=10):
"""Add an edge and manually back-date its last_seen."""
mesh_graph.add_edge(from_p, to_p)
edge_key = (from_p, to_p)
mesh_graph.edges[edge_key]['last_seen'] = datetime.now() - timedelta(days=days_old)
def test_prune_removes_expired_edge_from_edges(self, mesh_graph):
"""An edge older than expiration_days must be removed from self.edges."""
self._add_expired_edge(mesh_graph, 'ab', 'cd')
assert ('ab', 'cd') in mesh_graph.edges # sanity
mesh_graph.prune_expired_edges()
assert ('ab', 'cd') not in mesh_graph.edges
def test_prune_removes_expired_edge_from_outgoing_index(self, mesh_graph):
"""Pruned edge must be removed from _outgoing_index."""
self._add_expired_edge(mesh_graph, 'ab', 'cd')
mesh_graph.prune_expired_edges()
assert 'cd' not in mesh_graph._outgoing_index.get('ab', set())
def test_prune_removes_expired_edge_from_incoming_index(self, mesh_graph):
"""Pruned edge must be removed from _incoming_index."""
self._add_expired_edge(mesh_graph, 'ab', 'cd')
mesh_graph.prune_expired_edges()
assert 'ab' not in mesh_graph._incoming_index.get('cd', set())
def test_prune_keeps_fresh_edge(self, mesh_graph):
"""An edge with a recent last_seen must NOT be pruned."""
mesh_graph.add_edge('ab', 'cd') # last_seen = now
mesh_graph.prune_expired_edges()
assert ('ab', 'cd') in mesh_graph.edges
def test_prune_cleans_notification_timestamp(self, mesh_graph):
"""prune_expired_edges() must also clean up the notification timestamp entry."""
self._add_expired_edge(mesh_graph, 'ab', 'cd')
mesh_graph._notification_timestamps[('ab', 'cd')] = time.time()
mesh_graph.prune_expired_edges()
assert ('ab', 'cd') not in mesh_graph._notification_timestamps
def test_prune_removes_empty_index_entries(self, mesh_graph):
"""When the last edge for a prefix is pruned, the index key must be removed."""
self._add_expired_edge(mesh_graph, 'ab', 'cd')
mesh_graph.prune_expired_edges()
assert 'ab' not in mesh_graph._outgoing_index
assert 'cd' not in mesh_graph._incoming_index
def test_prune_returns_count_of_removed_edges(self, mesh_graph):
"""prune_expired_edges() must return the number of edges it removed."""
self._add_expired_edge(mesh_graph, 'ab', 'cd')
self._add_expired_edge(mesh_graph, 'ab', 'ef')
mesh_graph.add_edge('ab', 'gh') # fresh — should NOT be pruned
count = mesh_graph.prune_expired_edges()
assert count == 2
def test_prune_disabled_when_expiration_days_zero(self, mesh_graph):
"""When edge_expiration_days == 0, prune_expired_edges() must do nothing."""
mesh_graph.edge_expiration_days = 0
self._add_expired_edge(mesh_graph, 'ab', 'cd')
count = mesh_graph.prune_expired_edges()
assert count == 0
assert ('ab', 'cd') in mesh_graph.edges
def test_startup_sql_filter_excludes_expired_edges(self, mock_bot):
"""MeshGraph.__init__ must not load edges older than edge_expiration_days."""
# Insert one expired row and one fresh row directly into the DB
db_path = mock_bot.db_manager.db_path
expired_ts = (datetime.now() - timedelta(days=30)).isoformat()
fresh_ts = datetime.now().isoformat()
with sqlite3.connect(db_path) as conn:
conn.execute(
'''INSERT INTO mesh_connections
(from_prefix, to_prefix, observation_count, first_seen, last_seen)
VALUES (?, ?, ?, ?, ?)''',
('aa', 'bb', 5, expired_ts, expired_ts),
)
conn.execute(
'''INSERT INTO mesh_connections
(from_prefix, to_prefix, observation_count, first_seen, last_seen)
VALUES (?, ?, ?, ?, ?)''',
('cc', 'dd', 3, fresh_ts, fresh_ts),
)
conn.commit()
# graph_edge_expiration_days = 7 is set in the test_config fixture
graph = MeshGraph(mock_bot)
assert ('aa', 'bb') not in graph.edges, "Expired edge should not have been loaded"
assert ('cc', 'dd') in graph.edges, "Fresh edge must be loaded"
# ---------------------------------------------------------------------------
# 4. Web Viewer Notification Throttle
# ---------------------------------------------------------------------------
@pytest.mark.unit
class TestNotificationThrottle:
"""Verify that _notify_web_viewer_edge() throttles repeated update notifications."""
@pytest.fixture
def notifying_graph(self, mock_bot):
"""MeshGraph with a mock web_viewer_integration so notifications are trackable."""
web_vi = MagicMock()
web_vi.bot_integration = MagicMock()
web_vi.bot_integration.send_mesh_edge_update = MagicMock()
mock_bot.web_viewer_integration = web_vi
mock_bot.config.set('Path_Command', 'graph_write_strategy', 'immediate')
graph = MeshGraph(mock_bot)
return graph
def _notification_count(self, graph):
return graph.bot.web_viewer_integration.bot_integration.send_mesh_edge_update.call_count
def test_new_edge_always_notifies(self, notifying_graph):
"""A brand-new edge must trigger an immediate notification."""
notifying_graph.add_edge('ab', 'cd')
assert self._notification_count(notifying_graph) == 1
def test_repeated_update_within_window_skips_notification(self, notifying_graph):
"""A second add_edge() call within the 10-second window must NOT notify again."""
notifying_graph.add_edge('ab', 'cd') # new edge → notifies
notifying_graph.add_edge('ab', 'cd') # update → throttled
assert self._notification_count(notifying_graph) == 1
def test_update_after_throttle_window_notifies(self, notifying_graph):
"""An update after the 10-second throttle window has passed MUST notify."""
notifying_graph.add_edge('ab', 'cd') # new edge → notifies
# Backdate the stored timestamp to simulate 11 seconds having elapsed
notifying_graph._notification_timestamps[('ab', 'cd')] = time.time() - 11.0
notifying_graph.add_edge('ab', 'cd') # update after window → should notify
assert self._notification_count(notifying_graph) == 2
def test_throttle_is_per_edge(self, notifying_graph):
"""Each edge has its own throttle; two new edges must each notify once."""
notifying_graph.add_edge('ab', 'cd')
notifying_graph.add_edge('ab', 'ef')
assert self._notification_count(notifying_graph) == 2
# ---------------------------------------------------------------------------
# 5. capture_enabled Flag
# ---------------------------------------------------------------------------
@pytest.mark.unit
class TestCaptureEnabled:
"""Verify that graph_capture_enabled controls data collection and thread startup."""
def test_capture_enabled_by_default(self, mesh_graph):
"""capture_enabled must be True when not explicitly configured."""
assert mesh_graph.capture_enabled is True
def test_capture_disabled_prevents_add_edge(self, mesh_graph):
"""When capture_enabled is False, add_edge() must be a no-op."""
mesh_graph.capture_enabled = False
mesh_graph.add_edge('ab', 'cd')
assert mesh_graph.get_edge('ab', 'cd') is None
def test_capture_disabled_leaves_indexes_empty(self, mesh_graph):
"""When capture is off, no index entries must be created."""
mesh_graph.capture_enabled = False
mesh_graph.add_edge('ab', 'cd')
assert 'cd' not in mesh_graph._outgoing_index.get('ab', set())
assert 'ab' not in mesh_graph._incoming_index.get('cd', set())
def test_capture_disabled_from_config(self, mock_bot):
"""MeshGraph must read graph_capture_enabled = false from config."""
mock_bot.config.set('Path_Command', 'graph_capture_enabled', 'false')
graph = MeshGraph(mock_bot)
assert graph.capture_enabled is False
def test_capture_disabled_no_batch_thread_started(self, mock_bot):
"""When capture is off, the background batch writer thread must NOT start."""
mock_bot.config.set('Path_Command', 'graph_capture_enabled', 'false')
mock_bot.config.set('Path_Command', 'graph_write_strategy', 'batched')
graph = MeshGraph(mock_bot)
# Either _batch_thread was never set or it is None
batch_thread = getattr(graph, '_batch_thread', None)
assert batch_thread is None, "Batch writer thread should not start when capture is disabled"