From 5b7250cdd4f855dc16aa4c82af6c2724ce814189 Mon Sep 17 00:00:00 2001 From: agessaman Date: Wed, 18 Feb 2026 15:09:53 -0800 Subject: [PATCH 01/10] Enhance graph data handling and configuration options - Updated `config.ini.example` to include new settings for edge loading and graph data capture, providing clearer guidance on usage. - Modified `MeshGraph` class to respect the new `graph_capture_enabled` setting, controlling the collection of edge data from incoming packets. - Adjusted message handling to ensure graph updates only occur when data capture is enabled, optimizing performance and resource usage. - Added tests to validate the new configuration options, ensuring proper functionality in various scenarios. --- config.ini.example | 20 +- modules/mesh_graph.py | 250 +++++++++++---- modules/message_handler.py | 16 +- tests/conftest.py | 1 + tests/unit/test_mesh_graph_optimizations.py | 338 ++++++++++++++++++++ 5 files changed, 563 insertions(+), 62 deletions(-) create mode 100644 tests/unit/test_mesh_graph_optimizations.py diff --git a/config.ini.example b/config.ini.example index 0c510e1..1b80872 100644 --- a/config.ini.example +++ b/config.ini.example @@ -801,11 +801,25 @@ graph_path_validation_max_bonus = 0.3 # Lower values = stronger bonus from observation count. 50.0 means 50 observations = 0.15 bonus graph_path_validation_obs_divisor = 50.0 -# Load only recent edges on startup (days, 0 = load all) -# Useful for large graphs. Set to 0 to load all edges, or N to load only edges seen in last N days. -# For development with frequent restarts, 0 (load all) is recommended to maintain graph quality +# Load only recent edges on startup (days, 0 = load all historical edges) +# Edges older than this are skipped at startup to bound initial memory usage. +# The in-code default is 14 days when this setting is absent from config.ini. +# Recommended values: +# 0 - Load all historical edges (servers with ample RAM, e.g. x86 VM) +# 14 - Good balance of coverage vs. memory (default for unconfigured installs) +# 7 - Reduced memory footprint for Raspberry Pi Zero 2 W +# Note: edges older than graph_edge_expiration_days are never loaded regardless of this value. graph_startup_load_days = 0 +# Enable graph data capture from incoming packets (default: true) +# When true, the bot observes routing paths from advertisements, messages, and trace +# packets and stores edges in the mesh graph. +# When false, NO new edge data is collected and the background batch writer thread is +# not started — reducing both CPU and RAM overhead. Any edges already in the database +# are still available for graph_based_validation if that is also enabled. +# Set to false on devices that don't use the path command and want minimal overhead. +graph_capture_enabled = true + # Star bias multiplier for path command # When a contact is starred in the web viewer, multiply its selection score by this value # Higher values = stronger preference for starred repeaters diff --git a/modules/mesh_graph.py b/modules/mesh_graph.py index 9ca7a51..7934641 100644 --- a/modules/mesh_graph.py +++ b/modules/mesh_graph.py @@ -6,6 +6,8 @@ Persists graph state across bot restarts for development scenarios. """ import sqlite3 +import sys +import time import threading from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple, Set @@ -24,29 +26,44 @@ class MeshGraph: self.bot = bot self.logger = bot.logger self.db_manager = bot.db_manager - + + # Capture/validation feature flags + # graph_capture_enabled: controls whether new edge data is collected from packets + # When False, no new edges are added and the batch writer thread is not started. + self.capture_enabled = bot.config.getboolean('Path_Command', 'graph_capture_enabled', fallback=True) + # In-memory graph storage: {(from_prefix, to_prefix): edge_data} self.edges: Dict[Tuple[str, str], Dict] = {} - + + # Adjacency indexes for O(1) neighbour lookups (derived from self.edges) + self._outgoing_index: Dict[str, Set[str]] = defaultdict(set) # from_prefix -> set of to_prefixes + self._incoming_index: Dict[str, Set[str]] = defaultdict(set) # to_prefix -> set of from_prefixes + + # Per-edge last-notification timestamps for web viewer throttling (unix float) + self._notification_timestamps: Dict[Tuple[str, str], float] = {} + # Track pending updates for batched writes self.pending_updates: Set[Tuple[str, str]] = set() self.pending_lock = threading.Lock() - + # Write strategy configuration self.write_strategy = bot.config.get('Path_Command', 'graph_write_strategy', fallback='hybrid') self.batch_interval = bot.config.getint('Path_Command', 'graph_batch_interval_seconds', fallback=30) self.batch_max_pending = bot.config.getint('Path_Command', 'graph_batch_max_pending', fallback=100) - self.startup_load_days = bot.config.getint('Path_Command', 'graph_startup_load_days', fallback=0) - + # Default 14 days: edges older than this carry near-zero recency confidence anyway. + # Set to 0 in config.ini to load all historical edges (e.g. on servers with ample RAM). + self.startup_load_days = bot.config.getint('Path_Command', 'graph_startup_load_days', fallback=14) + self.edge_expiration_days = bot.config.getint('Path_Command', 'graph_edge_expiration_days', fallback=7) + # Background task for batched writes self._batch_task = None self._shutdown_event = threading.Event() - + # Load graph from database on startup self._load_from_database() - - # Start background batch writer if needed - if self.write_strategy in ('batched', 'hybrid'): + + # Start background batch writer only when capture is active + if self.capture_enabled and self.write_strategy in ('batched', 'hybrid'): self._start_batch_writer() def _load_from_database(self): @@ -58,42 +75,70 @@ class MeshGraph: geographic_distance FROM mesh_connections ''' - - # Apply date filter if configured + + # Build WHERE clause combining startup_load_days and edge_expiration_days. + # startup_load_days: explicit cap on how far back to load (0 = no cap). + # edge_expiration_days: always applied — never load edges we would immediately + # evict as expired (this bounds memory even when startup_load_days=0). + where_parts = [] if self.startup_load_days > 0: cutoff_date = datetime.now() - timedelta(days=self.startup_load_days) - query += f" WHERE last_seen >= '{cutoff_date.isoformat()}'" - + where_parts.append(f"last_seen >= '{cutoff_date.isoformat()}'") + if self.edge_expiration_days > 0: + expiry_date = datetime.now() - timedelta(days=self.edge_expiration_days) + where_parts.append(f"last_seen >= '{expiry_date.isoformat()}'") + if where_parts: + # Use the most restrictive (most recent) cutoff + query += " WHERE " + " AND ".join(where_parts) + query += " ORDER BY last_seen DESC" - + results = self.db_manager.execute_query(query) - + edge_count = 0 for row in results: from_prefix = row['from_prefix'] to_prefix = row['to_prefix'] edge_key = (from_prefix, to_prefix) - + + # Intern public key strings so identical keys across many edges share + # a single string object in memory rather than duplicating bytes. + from_pk = row.get('from_public_key') + to_pk = row.get('to_public_key') + if from_pk: + from_pk = sys.intern(from_pk) + if to_pk: + to_pk = sys.intern(to_pk) + self.edges[edge_key] = { 'from_prefix': from_prefix, 'to_prefix': to_prefix, - 'from_public_key': row.get('from_public_key'), - 'to_public_key': row.get('to_public_key'), + 'from_public_key': from_pk, + 'to_public_key': to_pk, 'observation_count': row.get('observation_count', 1), 'first_seen': row.get('first_seen'), 'last_seen': row.get('last_seen'), 'avg_hop_position': row.get('avg_hop_position'), 'geographic_distance': row.get('geographic_distance') } + + # Maintain adjacency indexes + self._outgoing_index[from_prefix].add(to_prefix) + self._incoming_index[to_prefix].add(from_prefix) + edge_count += 1 - + self.logger.info(f"Loaded {edge_count} graph edges from database") - + # Log statistics if edge_count > 0: total_observations = sum(e['observation_count'] for e in self.edges.values()) self.logger.info(f"Graph statistics: {edge_count} edges, {total_observations} total observations") - + + # Belt-and-suspenders: prune any edges that slipped through the SQL filter + # (e.g. timezone edge cases or edges loaded before expiration_days was set) + self.prune_expired_edges() + except Exception as e: self.logger.warning(f"Error loading graph from database: {e}") # Continue with empty graph @@ -115,20 +160,30 @@ class MeshGraph: """ if not from_prefix or not to_prefix: return - + + # Respect the capture kill-switch — allow reads but block writes + if not self.capture_enabled: + return + # Normalize prefixes to lowercase from_prefix = from_prefix.lower()[:2] to_prefix = to_prefix.lower()[:2] - + + # Intern public key strings so repeated identical keys share one object in RAM + if from_public_key: + from_public_key = sys.intern(from_public_key) + if to_public_key: + to_public_key = sys.intern(to_public_key) + edge_key = (from_prefix, to_prefix) now = datetime.now() - + # Update or create edge if edge_key in self.edges: edge = self.edges[edge_key] edge['observation_count'] += 1 edge['last_seen'] = now - + # Update average hop position if hop_position is not None: current_avg = edge.get('avg_hop_position') @@ -139,21 +194,21 @@ class MeshGraph: else: # First time setting hop position edge['avg_hop_position'] = hop_position - + # Update public keys if provided (always update if we have a better key) # This allows us to fill in missing keys on existing edges if from_public_key: edge['from_public_key'] = from_public_key if to_public_key: edge['to_public_key'] = to_public_key - + # Update geographic distance if provided if geographic_distance is not None: edge['geographic_distance'] = geographic_distance - + is_new_edge = False else: - # New edge + # New edge — also update adjacency indexes self.edges[edge_key] = { 'from_prefix': from_prefix, 'to_prefix': to_prefix, @@ -165,6 +220,8 @@ class MeshGraph: 'avg_hop_position': hop_position if hop_position is not None else None, 'geographic_distance': geographic_distance } + self._outgoing_index[from_prefix].add(to_prefix) + self._incoming_index[to_prefix].add(from_prefix) is_new_edge = True # Persist according to write strategy @@ -193,18 +250,31 @@ class MeshGraph: self._notify_web_viewer_edge(edge_key, is_new_edge) def _notify_web_viewer_edge(self, edge_key: Tuple[str, str], is_new: bool): - """Notify web viewer of edge update via bot integration""" + """Notify web viewer of edge update via bot integration. + + New edges always trigger an immediate notification. Updates to existing + edges are throttled to at most once every 10 seconds to reduce HTTP + traffic on busy meshes. + """ try: if not hasattr(self.bot, 'web_viewer_integration') or not self.bot.web_viewer_integration: return - + if not hasattr(self.bot.web_viewer_integration, 'bot_integration'): return - + edge = self.edges.get(edge_key) if not edge: return - + + # Throttle repeated updates for the same edge (new edges always notify) + now_ts = time.time() + if not is_new: + last_notified = self._notification_timestamps.get(edge_key, 0.0) + if (now_ts - last_notified) < 10.0: + return # Skip — notified recently enough + self._notification_timestamps[edge_key] = now_ts + # Prepare edge data for web viewer edge_data = { 'from_prefix': edge['from_prefix'], @@ -218,7 +288,7 @@ class MeshGraph: 'geographic_distance': edge.get('geographic_distance'), 'is_new': is_new } - + # Send update asynchronously self.bot.web_viewer_integration.bot_integration.send_mesh_edge_update(edge_data) except Exception as e: @@ -434,35 +504,38 @@ class MeshGraph: is_new: bool, conn: Optional[sqlite3.Connection] = None, location_cache: Optional[Dict[str, Tuple[float, float]]] = None, + skip_distance_recalc: bool = False, ): """Write a single edge to the database. - + Args: edge_key: (from_prefix, to_prefix) tuple. is_new: True if this is a new edge, False if updating existing. conn: Optional existing DB connection for batch operations (caller commits). location_cache: Optional cache for location lookups within a flush. + skip_distance_recalc: If True, skip distance recalculation (used by + _flush_pending_updates_sync which already recalculates before calling here). """ if edge_key not in self.edges: return - + edge = self.edges[edge_key] - - # Recalculate distance using full public keys if available (more accurate) - # This fixes issues where prefix collisions cause wrong locations to be used - if edge.get('from_public_key') or edge.get('to_public_key'): + + # Recalculate distance using full public keys if available (more accurate). + # Skipped when called from the batch flush loop, which already recalculated. + if not skip_distance_recalc and (edge.get('from_public_key') or edge.get('to_public_key')): recalculated_distance = self._recalculate_distance_if_needed( edge, conn=conn, location_cache=location_cache ) if recalculated_distance is not None: edge['geographic_distance'] = recalculated_distance self.logger.debug(f"Mesh graph: Recalculated distance for {edge_key} using public keys: {recalculated_distance:.1f} km") - + try: if is_new: # Insert new edge query = ''' - INSERT INTO mesh_connections + INSERT INTO mesh_connections (from_prefix, to_prefix, from_public_key, to_public_key, observation_count, first_seen, last_seen, avg_hop_position, geographic_distance) @@ -480,10 +553,10 @@ class MeshGraph: edge.get('geographic_distance') ) else: - # Update existing edge - recalculate distance if we now have public keys - # Only update distance if we have at least one public key and current distance seems wrong + # Update existing edge — recalculate distance if we now have public keys, + # but only when not already done by the caller (skip_distance_recalc=False). current_distance = edge.get('geographic_distance') - if (edge.get('from_public_key') or edge.get('to_public_key')) and current_distance: + if not skip_distance_recalc and (edge.get('from_public_key') or edge.get('to_public_key')) and current_distance: recalculated = self._recalculate_distance_if_needed( edge, conn=conn, location_cache=location_cache ) @@ -581,6 +654,51 @@ class MeshGraph: self.logger.debug(f"Error building update params for {edge_key}: {e}") return None + def prune_expired_edges(self) -> int: + """Remove edges from the in-memory graph that have exceeded graph_edge_expiration_days. + + Only evicts from RAM — the database rows are kept so that historical data is + preserved and can be reloaded if the expiration window is later widened. + + Returns: + int: Number of edges evicted. + """ + if self.edge_expiration_days <= 0: + return 0 + + cutoff = datetime.now() - timedelta(days=self.edge_expiration_days) + expired_keys = [] + for edge_key, edge in self.edges.items(): + last_seen = edge.get('last_seen') + if last_seen is None: + continue + if isinstance(last_seen, str): + try: + last_seen = datetime.fromisoformat(last_seen.replace('Z', '+00:00')) + except ValueError: + continue + if last_seen < cutoff: + expired_keys.append(edge_key) + + for edge_key in expired_keys: + from_prefix, to_prefix = edge_key + del self.edges[edge_key] + # Clean up adjacency indexes + if from_prefix in self._outgoing_index: + self._outgoing_index[from_prefix].discard(to_prefix) + if not self._outgoing_index[from_prefix]: + del self._outgoing_index[from_prefix] + if to_prefix in self._incoming_index: + self._incoming_index[to_prefix].discard(from_prefix) + if not self._incoming_index[to_prefix]: + del self._incoming_index[to_prefix] + # Drop stale notification timestamp if present + self._notification_timestamps.pop(edge_key, None) + + if expired_keys: + self.logger.debug(f"Pruned {len(expired_keys)} expired graph edges (older than {self.edge_expiration_days} days)") + return len(expired_keys) + def _start_batch_writer(self): """Start background task for batched writes.""" def batch_writer_loop(): @@ -589,7 +707,9 @@ class MeshGraph: if not self._shutdown_event.is_set(): # Flush synchronously (database operations are synchronous) self._flush_pending_updates_sync() - + # Periodically evict expired edges from RAM + self.prune_expired_edges() + import threading batch_thread = threading.Thread(target=batch_writer_loop, daemon=True) batch_thread.start() @@ -630,7 +750,9 @@ class MeshGraph: (edge_key[0], edge_key[1]), ) is_new = cursor.fetchone() is None - self._write_edge_to_db(edge_key, is_new, conn=conn, location_cache=location_cache) + # Distance was already recalculated above — tell _write_edge_to_db to skip it + self._write_edge_to_db(edge_key, is_new, conn=conn, location_cache=location_cache, + skip_distance_recalc=True) if conn: conn.commit() except Exception as e: @@ -684,27 +806,47 @@ class MeshGraph: def get_outgoing_edges(self, prefix: str) -> List[Dict]: """Get all edges originating from a node. - + + Uses the adjacency index for O(1) lookup instead of a full scan. + Args: prefix: Node prefix. - + Returns: List of edge dictionaries. """ prefix = prefix.lower()[:2] - return [edge for (f, t), edge in self.edges.items() if f == prefix] - + to_prefixes = self._outgoing_index.get(prefix) + if not to_prefixes: + return [] + result = [] + for to_prefix in to_prefixes: + edge = self.edges.get((prefix, to_prefix)) + if edge is not None: + result.append(edge) + return result + def get_incoming_edges(self, prefix: str) -> List[Dict]: """Get all edges ending at a node. - + + Uses the adjacency index for O(1) lookup instead of a full scan. + Args: prefix: Node prefix. - + Returns: List of edge dictionaries. """ prefix = prefix.lower()[:2] - return [edge for (f, t), edge in self.edges.items() if t == prefix] + from_prefixes = self._incoming_index.get(prefix) + if not from_prefixes: + return [] + result = [] + for from_prefix in from_prefixes: + edge = self.edges.get((from_prefix, prefix)) + if edge is not None: + result.append(edge) + return result def validate_path_segment(self, from_prefix: str, to_prefix: str, min_observations: int = 1, diff --git a/modules/message_handler.py b/modules/message_handler.py index f0e57bc..9d7fb69 100644 --- a/modules/message_handler.py +++ b/modules/message_handler.py @@ -536,7 +536,9 @@ class MessageHandler: # Update mesh graph with edges from the advert path # Create edge from advertising device to first hop in path - if out_path and out_path_len > 0 and hasattr(self.bot, 'mesh_graph') and self.bot.mesh_graph: + if (out_path and out_path_len > 0 + and hasattr(self.bot, 'mesh_graph') and self.bot.mesh_graph + and self.bot.mesh_graph.capture_enabled): self._update_mesh_graph_from_advert(advert_data, out_path, out_path_len, packet_info) # Store complete path in observed_paths table @@ -1605,7 +1607,8 @@ class MessageHandler: # Update mesh graph with trace path - bot is the destination, so we can confirm these edges # Since the bot received this trace packet, it's the destination node - self._update_mesh_graph_from_trace(path_hashes, packet_info) + if hasattr(self.bot, 'mesh_graph') and self.bot.mesh_graph and self.bot.mesh_graph.capture_enabled: + self._update_mesh_graph_from_trace(path_hashes, packet_info) else: path_string = "Direct" if hops == 0 else f"Unknown routing ({hops} hops)" self.logger.info(f"🎯 EXTRACTED PATH FROM TRACE PACKET: {path_string}") @@ -1619,7 +1622,8 @@ class MessageHandler: path_string = ','.join(path_nodes) self.logger.info(f"🎯 EXTRACTED PATH FROM PACKET: {path_string} ({hops} hops)") # Update mesh graph with path edges - self._update_mesh_graph(path_nodes, packet_info) + if hasattr(self.bot, 'mesh_graph') and self.bot.mesh_graph and self.bot.mesh_graph.capture_enabled: + self._update_mesh_graph(path_nodes, packet_info) else: # Method 2: Try path_hex field path_hex = packet_info.get('path_hex', '') @@ -1629,7 +1633,8 @@ class MessageHandler: path_string = ','.join(path_nodes) self.logger.info(f"🎯 EXTRACTED PATH FROM PACKET HEX: {path_string} ({hops} hops)") # Update mesh graph with path edges - self._update_mesh_graph(path_nodes, packet_info) + if hasattr(self.bot, 'mesh_graph') and self.bot.mesh_graph and self.bot.mesh_graph.capture_enabled: + self._update_mesh_graph(path_nodes, packet_info) else: # Method 3: Try path_info.path field path_info = packet_info.get('path_info', {}) @@ -1638,7 +1643,8 @@ class MessageHandler: path_string = ','.join(path_nodes) self.logger.info(f"🎯 EXTRACTED PATH FROM PATH_INFO: {path_string} ({hops} hops)") # Update mesh graph with path edges - self._update_mesh_graph(path_nodes, packet_info) + if hasattr(self.bot, 'mesh_graph') and self.bot.mesh_graph and self.bot.mesh_graph.capture_enabled: + self._update_mesh_graph(path_nodes, packet_info) else: # No path found - this is truly unknown path_string = "Direct" if hops == 0 else "Unknown routing" diff --git a/tests/conftest.py b/tests/conftest.py index f993e59..3ffbbee 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -124,6 +124,7 @@ def test_config(): config.set('Path_Command', 'graph_batch_max_pending', '100') config.set('Path_Command', 'graph_startup_load_days', '0') # Don't load old data in tests config.set('Path_Command', 'graph_edge_expiration_days', '7') + config.set('Path_Command', 'graph_capture_enabled', 'true') config.set('Path_Command', 'graph_use_bidirectional', 'true') config.set('Path_Command', 'graph_use_hop_position', 'true') config.set('Path_Command', 'graph_multi_hop_enabled', 'true') diff --git a/tests/unit/test_mesh_graph_optimizations.py b/tests/unit/test_mesh_graph_optimizations.py new file mode 100644 index 0000000..89fb328 --- /dev/null +++ b/tests/unit/test_mesh_graph_optimizations.py @@ -0,0 +1,338 @@ +#!/usr/bin/env python3 +""" +Unit tests for MeshGraph performance optimizations. + +Covers the optimizations added for low-memory devices (Raspberry Pi Zero 2 W): + - Adjacency indexes (_outgoing_index / _incoming_index) for O(1) lookups + - sys.intern() public-key string deduplication + - prune_expired_edges() and edge expiration SQL filter + - Web-viewer notification throttle (_notification_timestamps) + - capture_enabled flag (graph_capture_enabled config setting) +""" + +import time +import sqlite3 +import pytest +from datetime import datetime, timedelta +from unittest.mock import MagicMock + +from modules.mesh_graph import MeshGraph + + +# --------------------------------------------------------------------------- +# Helper +# --------------------------------------------------------------------------- + +def _make_key(prefix: str) -> str: + """Generate a deterministic 64-char hex public key from a 2-char prefix.""" + return (prefix.lower() * 32)[:64] + + +# --------------------------------------------------------------------------- +# 1. Adjacency Indexes +# --------------------------------------------------------------------------- + +@pytest.mark.unit +class TestAdjacencyIndexes: + """Verify that _outgoing_index and _incoming_index are maintained correctly.""" + + def test_index_populated_on_add_edge(self, mesh_graph): + """Adding an edge must update both adjacency indexes.""" + mesh_graph.add_edge('ab', 'cd') + + assert 'cd' in mesh_graph._outgoing_index['ab'] + assert 'ab' in mesh_graph._incoming_index['cd'] + + def test_index_not_duplicated_on_update(self, mesh_graph): + """Updating an existing edge must not add duplicate entries to the sets.""" + mesh_graph.add_edge('ab', 'cd') + mesh_graph.add_edge('ab', 'cd') # second call is an update + + assert len(mesh_graph._outgoing_index['ab']) == 1 + assert len(mesh_graph._incoming_index['cd']) == 1 + + def test_get_outgoing_edges_uses_index(self, mesh_graph): + """get_outgoing_edges() must return all edges from a node via the index.""" + mesh_graph.add_edge('ab', 'cd') + mesh_graph.add_edge('ab', 'ef') + + result = mesh_graph.get_outgoing_edges('ab') + + assert len(result) == 2 + to_prefixes = {e['to_prefix'] for e in result} + assert to_prefixes == {'cd', 'ef'} + + def test_get_incoming_edges_uses_index(self, mesh_graph): + """get_incoming_edges() must return all edges to a node via the index.""" + mesh_graph.add_edge('ab', 'ef') + mesh_graph.add_edge('cd', 'ef') + + result = mesh_graph.get_incoming_edges('ef') + + assert len(result) == 2 + from_prefixes = {e['from_prefix'] for e in result} + assert from_prefixes == {'ab', 'cd'} + + def test_get_outgoing_edges_empty_for_unknown_prefix(self, mesh_graph): + """get_outgoing_edges() for an unknown prefix must return [] without raising.""" + result = mesh_graph.get_outgoing_edges('zz') + assert result == [] + + def test_get_incoming_edges_empty_for_unknown_prefix(self, mesh_graph): + """get_incoming_edges() for an unknown prefix must return [] without raising.""" + result = mesh_graph.get_incoming_edges('zz') + assert result == [] + + def test_index_consistent_with_edges_dict(self, mesh_graph): + """Every (from, to) pair in self.edges must be reflected in both indexes.""" + mesh_graph.add_edge('ab', 'cd') + mesh_graph.add_edge('ab', 'ef') + mesh_graph.add_edge('cd', 'ef') + + for (from_p, to_p) in mesh_graph.edges: + assert to_p in mesh_graph._outgoing_index[from_p], \ + f"Missing {to_p} in _outgoing_index[{from_p}]" + assert from_p in mesh_graph._incoming_index[to_p], \ + f"Missing {from_p} in _incoming_index[{to_p}]" + + +# --------------------------------------------------------------------------- +# 2. Public Key Interning +# --------------------------------------------------------------------------- + +@pytest.mark.unit +class TestPublicKeyInterning: + """Verify that sys.intern() causes identical public-key strings to share identity.""" + + def test_same_key_shared_across_edges(self, mesh_graph): + """Identical public keys stored on different edges must be the same object.""" + shared_key = _make_key('ab') + + mesh_graph.add_edge('ab', 'cd', from_public_key=shared_key) + mesh_graph.add_edge('ab', 'ef', from_public_key=shared_key) + + edge1 = mesh_graph.get_edge('ab', 'cd') + edge2 = mesh_graph.get_edge('ab', 'ef') + + # 'is' checks object identity — only true if sys.intern() is working + assert edge1['from_public_key'] is edge2['from_public_key'] + + def test_interning_does_not_alter_value(self, mesh_graph): + """sys.intern() must not change the string's value.""" + key = _make_key('ab') + mesh_graph.add_edge('ab', 'cd', from_public_key=key) + + edge = mesh_graph.get_edge('ab', 'cd') + assert edge['from_public_key'] == key + + +# --------------------------------------------------------------------------- +# 3. Edge Expiration / prune_expired_edges() +# --------------------------------------------------------------------------- + +@pytest.mark.unit +class TestEdgeExpiration: + """Verify that prune_expired_edges() correctly evicts stale edges from RAM.""" + + def _add_expired_edge(self, mesh_graph, from_p, to_p, days_old=10): + """Add an edge and manually back-date its last_seen.""" + mesh_graph.add_edge(from_p, to_p) + edge_key = (from_p, to_p) + mesh_graph.edges[edge_key]['last_seen'] = datetime.now() - timedelta(days=days_old) + + def test_prune_removes_expired_edge_from_edges(self, mesh_graph): + """An edge older than expiration_days must be removed from self.edges.""" + self._add_expired_edge(mesh_graph, 'ab', 'cd') + assert ('ab', 'cd') in mesh_graph.edges # sanity + + mesh_graph.prune_expired_edges() + + assert ('ab', 'cd') not in mesh_graph.edges + + def test_prune_removes_expired_edge_from_outgoing_index(self, mesh_graph): + """Pruned edge must be removed from _outgoing_index.""" + self._add_expired_edge(mesh_graph, 'ab', 'cd') + + mesh_graph.prune_expired_edges() + + assert 'cd' not in mesh_graph._outgoing_index.get('ab', set()) + + def test_prune_removes_expired_edge_from_incoming_index(self, mesh_graph): + """Pruned edge must be removed from _incoming_index.""" + self._add_expired_edge(mesh_graph, 'ab', 'cd') + + mesh_graph.prune_expired_edges() + + assert 'ab' not in mesh_graph._incoming_index.get('cd', set()) + + def test_prune_keeps_fresh_edge(self, mesh_graph): + """An edge with a recent last_seen must NOT be pruned.""" + mesh_graph.add_edge('ab', 'cd') # last_seen = now + + mesh_graph.prune_expired_edges() + + assert ('ab', 'cd') in mesh_graph.edges + + def test_prune_cleans_notification_timestamp(self, mesh_graph): + """prune_expired_edges() must also clean up the notification timestamp entry.""" + self._add_expired_edge(mesh_graph, 'ab', 'cd') + mesh_graph._notification_timestamps[('ab', 'cd')] = time.time() + + mesh_graph.prune_expired_edges() + + assert ('ab', 'cd') not in mesh_graph._notification_timestamps + + def test_prune_removes_empty_index_entries(self, mesh_graph): + """When the last edge for a prefix is pruned, the index key must be removed.""" + self._add_expired_edge(mesh_graph, 'ab', 'cd') + + mesh_graph.prune_expired_edges() + + assert 'ab' not in mesh_graph._outgoing_index + assert 'cd' not in mesh_graph._incoming_index + + def test_prune_returns_count_of_removed_edges(self, mesh_graph): + """prune_expired_edges() must return the number of edges it removed.""" + self._add_expired_edge(mesh_graph, 'ab', 'cd') + self._add_expired_edge(mesh_graph, 'ab', 'ef') + mesh_graph.add_edge('ab', 'gh') # fresh — should NOT be pruned + + count = mesh_graph.prune_expired_edges() + + assert count == 2 + + def test_prune_disabled_when_expiration_days_zero(self, mesh_graph): + """When edge_expiration_days == 0, prune_expired_edges() must do nothing.""" + mesh_graph.edge_expiration_days = 0 + self._add_expired_edge(mesh_graph, 'ab', 'cd') + + count = mesh_graph.prune_expired_edges() + + assert count == 0 + assert ('ab', 'cd') in mesh_graph.edges + + def test_startup_sql_filter_excludes_expired_edges(self, mock_bot): + """MeshGraph.__init__ must not load edges older than edge_expiration_days.""" + # Insert one expired row and one fresh row directly into the DB + db_path = mock_bot.db_manager.db_path + expired_ts = (datetime.now() - timedelta(days=30)).isoformat() + fresh_ts = datetime.now().isoformat() + + with sqlite3.connect(db_path) as conn: + conn.execute( + '''INSERT INTO mesh_connections + (from_prefix, to_prefix, observation_count, first_seen, last_seen) + VALUES (?, ?, ?, ?, ?)''', + ('aa', 'bb', 5, expired_ts, expired_ts), + ) + conn.execute( + '''INSERT INTO mesh_connections + (from_prefix, to_prefix, observation_count, first_seen, last_seen) + VALUES (?, ?, ?, ?, ?)''', + ('cc', 'dd', 3, fresh_ts, fresh_ts), + ) + conn.commit() + + # graph_edge_expiration_days = 7 is set in the test_config fixture + graph = MeshGraph(mock_bot) + + assert ('aa', 'bb') not in graph.edges, "Expired edge should not have been loaded" + assert ('cc', 'dd') in graph.edges, "Fresh edge must be loaded" + + +# --------------------------------------------------------------------------- +# 4. Web Viewer Notification Throttle +# --------------------------------------------------------------------------- + +@pytest.mark.unit +class TestNotificationThrottle: + """Verify that _notify_web_viewer_edge() throttles repeated update notifications.""" + + @pytest.fixture + def notifying_graph(self, mock_bot): + """MeshGraph with a mock web_viewer_integration so notifications are trackable.""" + web_vi = MagicMock() + web_vi.bot_integration = MagicMock() + web_vi.bot_integration.send_mesh_edge_update = MagicMock() + mock_bot.web_viewer_integration = web_vi + mock_bot.config.set('Path_Command', 'graph_write_strategy', 'immediate') + graph = MeshGraph(mock_bot) + return graph + + def _notification_count(self, graph): + return graph.bot.web_viewer_integration.bot_integration.send_mesh_edge_update.call_count + + def test_new_edge_always_notifies(self, notifying_graph): + """A brand-new edge must trigger an immediate notification.""" + notifying_graph.add_edge('ab', 'cd') + assert self._notification_count(notifying_graph) == 1 + + def test_repeated_update_within_window_skips_notification(self, notifying_graph): + """A second add_edge() call within the 10-second window must NOT notify again.""" + notifying_graph.add_edge('ab', 'cd') # new edge → notifies + notifying_graph.add_edge('ab', 'cd') # update → throttled + + assert self._notification_count(notifying_graph) == 1 + + def test_update_after_throttle_window_notifies(self, notifying_graph): + """An update after the 10-second throttle window has passed MUST notify.""" + notifying_graph.add_edge('ab', 'cd') # new edge → notifies + # Backdate the stored timestamp to simulate 11 seconds having elapsed + notifying_graph._notification_timestamps[('ab', 'cd')] = time.time() - 11.0 + notifying_graph.add_edge('ab', 'cd') # update after window → should notify + + assert self._notification_count(notifying_graph) == 2 + + def test_throttle_is_per_edge(self, notifying_graph): + """Each edge has its own throttle; two new edges must each notify once.""" + notifying_graph.add_edge('ab', 'cd') + notifying_graph.add_edge('ab', 'ef') + + assert self._notification_count(notifying_graph) == 2 + + +# --------------------------------------------------------------------------- +# 5. capture_enabled Flag +# --------------------------------------------------------------------------- + +@pytest.mark.unit +class TestCaptureEnabled: + """Verify that graph_capture_enabled controls data collection and thread startup.""" + + def test_capture_enabled_by_default(self, mesh_graph): + """capture_enabled must be True when not explicitly configured.""" + assert mesh_graph.capture_enabled is True + + def test_capture_disabled_prevents_add_edge(self, mesh_graph): + """When capture_enabled is False, add_edge() must be a no-op.""" + mesh_graph.capture_enabled = False + mesh_graph.add_edge('ab', 'cd') + + assert mesh_graph.get_edge('ab', 'cd') is None + + def test_capture_disabled_leaves_indexes_empty(self, mesh_graph): + """When capture is off, no index entries must be created.""" + mesh_graph.capture_enabled = False + mesh_graph.add_edge('ab', 'cd') + + assert 'cd' not in mesh_graph._outgoing_index.get('ab', set()) + assert 'ab' not in mesh_graph._incoming_index.get('cd', set()) + + def test_capture_disabled_from_config(self, mock_bot): + """MeshGraph must read graph_capture_enabled = false from config.""" + mock_bot.config.set('Path_Command', 'graph_capture_enabled', 'false') + + graph = MeshGraph(mock_bot) + + assert graph.capture_enabled is False + + def test_capture_disabled_no_batch_thread_started(self, mock_bot): + """When capture is off, the background batch writer thread must NOT start.""" + mock_bot.config.set('Path_Command', 'graph_capture_enabled', 'false') + mock_bot.config.set('Path_Command', 'graph_write_strategy', 'batched') + + graph = MeshGraph(mock_bot) + + # Either _batch_thread was never set or it is None + batch_thread = getattr(graph, '_batch_thread', None) + assert batch_thread is None, "Batch writer thread should not start when capture is disabled" From ff2f03d363d12336dce678605c765fc4be313837 Mon Sep 17 00:00:00 2001 From: agessaman Date: Wed, 18 Feb 2026 15:22:09 -0800 Subject: [PATCH 02/10] Enhance FAQ and path command configuration documentation - Added a new section in the FAQ detailing how to run meshcore-bot on a Raspberry Pi Zero 2 W, including specific configuration settings to manage memory usage effectively. - Updated the path command configuration documentation to clarify the default values and usage of `graph_startup_load_days` and `graph_capture_enabled`, improving user understanding of these settings. --- docs/faq.md | 25 +++++++++++++++++++++++++ docs/path-command-config.md | 11 +++++++++-- modules/mesh_graph.py | 13 ++++++++++++- 3 files changed, 46 insertions(+), 3 deletions(-) diff --git a/docs/faq.md b/docs/faq.md index 344dd97..5a1876a 100644 --- a/docs/faq.md +++ b/docs/faq.md @@ -27,3 +27,28 @@ Without `--upgrade`, the script does *not* update the service file (systemd/laun ### How can I generate a custom command reference for my bot users? See [Custom command reference website](command-reference-website.md): it explains how to use `generate_website.py` to build a single-page HTML from your config (with optional styles) and upload it to your site. + +## Hardware and performance + +### How do I run meshcore-bot on a Raspberry Pi Zero 2 W? + +The Pi Zero 2 W has 512 MB of RAM, which is enough to run the bot, but the Mesh Graph can grow large on a busy mesh. Add the following to the `[Path_Command]` section of your `config.ini` to keep memory usage in check: + +```ini +[Path_Command] +# Limit startup memory: only load edges seen in the last 7 days. +# Edges older than this have near-zero path confidence anyway. +graph_startup_load_days = 7 + +# Evict edges from RAM after 7 days without a new observation. +graph_edge_expiration_days = 7 + +# Write graph updates in batches rather than on every packet. +graph_write_strategy = batched + +# If you don't use the !path command at all, disable graph capture +# entirely to eliminate the background thread and all graph overhead. +# graph_capture_enabled = false +``` + +These settings do not affect path prediction accuracy: edges older than a few days carry negligible confidence due to the 48-hour recency half-life used by the scoring algorithm. diff --git a/docs/path-command-config.md b/docs/path-command-config.md index 545706b..5691281 100644 --- a/docs/path-command-config.md +++ b/docs/path-command-config.md @@ -201,8 +201,15 @@ These settings control how graph edges are stored in the database. **`graph_startup_load_days`** (days, 0 = load all) - Load only edges seen in last N days on startup -- `0` = load all edges (recommended for development) -- Default: `0` +- `0` = load all edges (use on servers with ample RAM) +- Default: `14` (set to `0` in `config.ini` to load all) + +**`graph_capture_enabled`** (boolean) +- When `false`, no new edge data is collected from packets and the background + batch writer thread is not started — reducing CPU and RAM overhead +- Edges already in the database are still used for path validation +- Set to `false` on devices that don't use the path command +- Default: `true` ## Preset Configurations diff --git a/modules/mesh_graph.py b/modules/mesh_graph.py index 7934641..3833b1a 100644 --- a/modules/mesh_graph.py +++ b/modules/mesh_graph.py @@ -533,13 +533,24 @@ class MeshGraph: try: if is_new: - # Insert new edge + # Upsert new edge. + # Use INSERT ... ON CONFLICT DO UPDATE so that if the row already exists + # in the database (e.g. it was filtered out of the in-memory graph at + # startup by startup_load_days / edge_expiration_days, or written by a + # concurrent process), we merge rather than fail with UNIQUE constraint. query = ''' INSERT INTO mesh_connections (from_prefix, to_prefix, from_public_key, to_public_key, observation_count, first_seen, last_seen, avg_hop_position, geographic_distance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(from_prefix, to_prefix) DO UPDATE SET + observation_count = MAX(observation_count, excluded.observation_count), + last_seen = MAX(last_seen, excluded.last_seen), + avg_hop_position = excluded.avg_hop_position, + geographic_distance = COALESCE(excluded.geographic_distance, geographic_distance), + from_public_key = COALESCE(excluded.from_public_key, from_public_key), + to_public_key = COALESCE(excluded.to_public_key, to_public_key) ''' params = ( edge['from_prefix'], From de6580c231cd2a56ac2be680ca4f809f492f0f14 Mon Sep 17 00:00:00 2001 From: agessaman Date: Wed, 18 Feb 2026 15:36:09 -0800 Subject: [PATCH 03/10] Enhance web viewer troubleshooting documentation - Added a comprehensive troubleshooting section for accessing the web viewer on devices like Orange Pi and SBCs, detailing configuration checks, process verification, log inspection, firewall settings, and standalone run instructions. - Updated installation commands to include necessary dependencies for Flask and flask-socketio, improving clarity on setup requirements. - Revised port conflict resolution steps to include updated commands for checking active ports. --- docs/web-viewer.md | 68 ++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 66 insertions(+), 2 deletions(-) diff --git a/docs/web-viewer.md b/docs/web-viewer.md index 04996d5..7eb7ce8 100644 --- a/docs/web-viewer.md +++ b/docs/web-viewer.md @@ -147,9 +147,73 @@ You can keep or remove the old `bot_data.db` file after verifying the viewer wor ## Troubleshooting +### Web viewer not accessible (e.g. Orange Pi / SBC) + +If the viewer does not load from another device (e.g. from your phone or PC while the bot runs on an Orange Pi), work through these steps on the Pi. + +1. **Confirm config** + - In `config.ini` under `[Web_Viewer]`: + - `enabled = true` + - `auto_start = true` (if you want it to start with the bot) + - `host = 0.0.0.0` (required for access from other devices; `127.0.0.1` is localhost only) + - `port = 8080` (or another port 1024–65535) + - Restart the bot after changing config. + +2. **Check that the viewer process is running** + ```bash + # From project root on the Pi + ss -tlnp | grep 8080 + # or + netstat -tlnp | grep 8080 + ``` + If nothing listens on your port, the viewer did not start or has exited. + +3. **Inspect viewer logs** + - When run by the bot, the viewer writes to: + - `logs/web_viewer_stdout.log` + - `logs/web_viewer_stderr.log` + - Look for Python tracebacks, "Address already in use", or missing dependencies (e.g. Flask, flask-socketio). + - Optional: run the viewer manually to see errors in the terminal: + ```bash + cd /path/to/meshcore-bot + python3 modules/web_viewer/app.py --config config.ini --host 0.0.0.0 --port 8080 + ``` + +4. **Check integration startup** + - Bot logs may show: `Web viewer integration failed: ...` or `Web viewer integration initialized`. + - If integration failed, the viewer subprocess is never started; fix the error shown (e.g. invalid `host` or `port` in config). + +5. **Firewall** + - Many SBC images (e.g. Orange Pi, Armbian minimal) do **not** ship with a firewall; if `curl` to localhost works and `host = 0.0.0.0`, the blocker may be network (Wi‑Fi client isolation, different subnet, or router). Check from a device on the same LAN using `http://:8080`. + - If your system uses **ufw**: + ```bash + sudo ufw status + sudo ufw allow 8080/tcp + sudo ufw reload + ``` + - If `ufw` is not installed (e.g. `sudo: ufw: command not found`), you may have no host firewall—that’s common on embedded images. To allow the port with **iptables** (often available when ufw is not): + ```bash + sudo iptables -I INPUT -p tcp --dport 8080 -j ACCEPT + ``` + (Rules may not persist across reboots unless you use a persistence method for your distro.) + - If you prefer ufw, install it (e.g. `sudo apt install ufw`) and use the ufw commands above. + +6. **Test from the Pi first** + ```bash + curl -s -o /dev/null -w "%{http_code}" http://127.0.0.1:8080/ + ``` + If this returns `200`, the viewer is running and the issue is binding or firewall. If you use `host = 0.0.0.0`, then try from another device: `http://:8080`. + +7. **Standalone run (no bot)** + - To rule out bot integration issues, start the viewer by itself (same config path so it finds the DB): + ```bash + python3 modules/web_viewer/app.py --config config.ini --host 0.0.0.0 --port 8080 + ``` + - If `restart_viewer.sh` is used, note it binds to `127.0.0.1` by default; for network access run the command above with `--host 0.0.0.0` or edit the script. + ### Flask Not Found ```bash -pip3 install flask +pip3 install flask flask-socketio ``` ### Database Not Found @@ -158,7 +222,7 @@ pip3 install flask ### Port Already in Use - Change the port in `config.ini` or stop the conflicting service -- Use `lsof -i :5000` to find what's using the port +- Use `ss -tlnp | grep 8080` or `lsof -i :8080` (if available) to find what's using the port ### Permission Denied ```bash From 3afdfe6e013edab93dda65e9fd7c9a09417e72dc Mon Sep 17 00:00:00 2001 From: agessaman Date: Wed, 18 Feb 2026 15:46:16 -0800 Subject: [PATCH 04/10] Update Raspberry Pi Zero 2 W FAQ section with memory management tips - Expanded the guidance on running meshcore-bot on Raspberry Pi Zero 2 W, detailing two steps to optimize memory usage: disabling the web viewer and tuning the Mesh Graph settings. - Included specific configuration examples to help users effectively manage resource constraints on the device. --- docs/faq.md | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/docs/faq.md b/docs/faq.md index 5a1876a..76ef9dd 100644 --- a/docs/faq.md +++ b/docs/faq.md @@ -32,7 +32,29 @@ See [Custom command reference website](command-reference-website.md): it explain ### How do I run meshcore-bot on a Raspberry Pi Zero 2 W? -The Pi Zero 2 W has 512 MB of RAM, which is enough to run the bot, but the Mesh Graph can grow large on a busy mesh. Add the following to the `[Path_Command]` section of your `config.ini` to keep memory usage in check: +The Pi Zero 2 W has 512 MB of RAM. The bot and the web viewer are two separate +Python processes; together they use roughly 300 MB on a busy mesh, which leaves +little headroom. Follow the two steps below to keep things comfortable. + +#### Step 1 — Run the bot only (saves ~150 MB) + +The web viewer is optional. If you don't need the browser-based dashboard on +the Pi itself, disable it and access it from another machine instead: + +```ini +[Web_Viewer] +enabled = false +auto_start = false +``` + +The bot continues to work normally; the web viewer just won't start on the Pi. +If you still want the dashboard, run the viewer on a desktop or server that +shares the same database file (see [MeshCore Bot Data Viewer](web-viewer.md)). + +#### Step 2 — Tune the Mesh Graph (saves another 50–100 MB on busy meshes) + +Even with the web viewer off, the Mesh Graph can grow large. Add the following +to the `[Path_Command]` section of your `config.ini`: ```ini [Path_Command] @@ -51,4 +73,6 @@ graph_write_strategy = batched # graph_capture_enabled = false ``` -These settings do not affect path prediction accuracy: edges older than a few days carry negligible confidence due to the 48-hour recency half-life used by the scoring algorithm. +These settings do not affect path prediction accuracy: edges older than a few +days carry negligible confidence due to the 48-hour recency half-life used by +the scoring algorithm. From 6dcf256c77e3a8789d6cd25907ae27eee8695fb9 Mon Sep 17 00:00:00 2001 From: agessaman Date: Thu, 19 Feb 2026 19:39:32 -0800 Subject: [PATCH 05/10] Simplify contact removal logic and add versioning to webviewer. --- .github/workflows/docker-build.yml | 11 + Dockerfile | 4 + install-service.sh | 12 + modules/commands/repeater_command.py | 27 +- modules/core.py | 18 +- modules/repeater_manager.py | 865 ++++--------------------- modules/web_viewer/app.py | 66 +- modules/web_viewer/templates/base.html | 9 + 8 files changed, 254 insertions(+), 758 deletions(-) diff --git a/.github/workflows/docker-build.yml b/.github/workflows/docker-build.yml index c0957a0..1747dc4 100644 --- a/.github/workflows/docker-build.yml +++ b/.github/workflows/docker-build.yml @@ -57,6 +57,15 @@ jobs: type=sha,prefix=sha- type=raw,value=latest,enable=${{ github.ref == 'refs/heads/main' || github.ref == 'refs/heads/master' }} + - name: Set version for web viewer footer + id: version + run: | + if [[ "${{ github.ref }}" == refs/tags/* ]]; then + echo "version=${{ github.ref_name }}" >> $GITHUB_OUTPUT + else + echo "version=dev" >> $GITHUB_OUTPUT + fi + - name: Build and push Docker image uses: docker/build-push-action@v5 with: @@ -64,6 +73,8 @@ jobs: push: ${{ github.event_name != 'pull_request' }} tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} + build-args: | + MESHCORE_BOT_VERSION=${{ steps.version.outputs.version }} cache-from: type=gha cache-to: type=gha,mode=max platforms: linux/amd64,linux/arm64 diff --git a/Dockerfile b/Dockerfile index d6915b6..8c8e1b9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -43,6 +43,10 @@ COPY --from=builder /root/.local /home/meshcore/.local # Set working directory WORKDIR /app +# Version for web viewer footer (set at build time; e.g. --build-arg MESHCORE_BOT_VERSION=v1.2.3) +ARG MESHCORE_BOT_VERSION +ENV MESHCORE_BOT_VERSION=${MESHCORE_BOT_VERSION} + # Copy application files COPY --chown=meshcore:meshcore . /app/ diff --git a/install-service.sh b/install-service.sh index 85c7fd8..991decc 100755 --- a/install-service.sh +++ b/install-service.sh @@ -405,6 +405,18 @@ copy_files_smart "$SCRIPT_DIR" "$INSTALL_DIR" || { exit 1 } +# Write .version_info at install dir so web viewer and packet_capture show version after install +if command -v git &>/dev/null && [ -d "$SCRIPT_DIR/.git" ]; then + GIT_HASH="$(git -C "$SCRIPT_DIR" rev-parse --short HEAD 2>/dev/null || echo "unknown")" + if VERSION="$(git -C "$SCRIPT_DIR" describe --exact-match HEAD 2>/dev/null)"; then + INSTALLER_VER="$VERSION" + else + INSTALLER_VER="dev-${GIT_HASH}" + fi + printf '%s\n' "{\"installer_version\": \"${INSTALLER_VER}\", \"git_hash\": \"${GIT_HASH}\"}" > "$INSTALL_DIR/.version_info" + print_success "Wrote version info (${INSTALLER_VER}) to $INSTALL_DIR/.version_info" +fi + # If no config.ini in install dir, create it from config.ini.example if [ ! -f "$INSTALL_DIR/config.ini" ]; then if [ -f "$INSTALL_DIR/config.ini.example" ]; then diff --git a/modules/commands/repeater_command.py b/modules/commands/repeater_command.py index 51b8bcc..c2d405c 100644 --- a/modules/commands/repeater_command.py +++ b/modules/commands/repeater_command.py @@ -280,11 +280,7 @@ class RepeaterCommand(BaseCommand): # Force a complete refresh of contacts from device after purging self.logger.info("Forcing contact list refresh from device to ensure persistence...") try: - from meshcore_cli.meshcore_cli import next_cmd - await asyncio.wait_for( - next_cmd(self.bot.meshcore, ["contacts"]), - timeout=30.0 - ) + await self.bot.meshcore.commands.get_contacts() self.logger.info("Contact list refreshed from device") except Exception as e: self.logger.warning(f"Failed to refresh contact list: {e}") @@ -314,21 +310,16 @@ class RepeaterCommand(BaseCommand): # Final verification: Check if contacts were actually removed from device self.logger.info("Performing final verification of contact removal...") try: - from meshcore_cli.meshcore_cli import next_cmd - await asyncio.wait_for( - next_cmd(self.bot.meshcore, ["contacts"]), - timeout=30.0 - ) - + await self.bot.meshcore.commands.get_contacts() + # Count remaining repeaters on device - remaining_repeaters = 0 - if hasattr(self.bot.meshcore, 'contacts'): - for contact_key, contact_data in self.bot.meshcore.contacts.items(): - if self.bot.repeater_manager._is_repeater_device(contact_data): - remaining_repeaters += 1 - + remaining_repeaters = sum( + 1 for contact_data in self.bot.meshcore.contacts.values() + if self.bot.repeater_manager._is_repeater_device(contact_data) + ) + self.logger.info(f"Final verification: {remaining_repeaters} repeaters still on device") - + except Exception as e: self.logger.warning(f"Final verification failed: {e}") diff --git a/modules/core.py b/modules/core.py index 626e0c5..20f25c7 100644 --- a/modules/core.py +++ b/modules/core.py @@ -43,6 +43,22 @@ from .transmission_tracker import TransmissionTracker from .utils import resolve_path +class _DuplicateAwareConfigParser(configparser.ConfigParser): + """ConfigParser that allows duplicate options (last value wins) and logs ERROR when seen.""" + + def __init__(self, *args, **kwargs): + kwargs['strict'] = False + super().__init__(*args, **kwargs) + + def _handle_option(self, st, line, fpname): + if st.optname in st.options: + logging.getLogger(__name__).error( + "Duplicate option in config file: section [%s], option '%s' (file %s, line %s). Using last value.", + st.sectname, st.optname, fpname, getattr(st, 'lineno', '?'), + ) + st.options[st.optname] = st.optvalue + + class MeshCoreBot: """MeshCore Bot using official meshcore package. @@ -52,7 +68,7 @@ class MeshCoreBot: def __init__(self, config_file: str = "config.ini"): self.config_file = config_file - self.config = configparser.ConfigParser() + self.config = _DuplicateAwareConfigParser() self.load_config() # Setup logging diff --git a/modules/repeater_manager.py b/modules/repeater_manager.py index 1596d94..a2589ce 100644 --- a/modules/repeater_manager.py +++ b/modules/repeater_manager.py @@ -2102,57 +2102,35 @@ class RepeaterManager: """Remove a specific repeater from the device's contact list using proper MeshCore API""" self.logger.info(f"Starting purge process for public_key: {public_key}") self.logger.debug(f"Purge reason: {reason}") - + try: - # Find the contact in meshcore using proper MeshCore methods - contact_to_remove = None - contact_name = None - contact_key = None - - self.logger.debug(f"Searching through {len(self.bot.meshcore.contacts)} contacts...") - - # Try to find contact using MeshCore helper methods first - try: - # Method 1: Try to find by public key prefix - contact_to_remove = self.bot.meshcore.get_contact_by_key_prefix(public_key[:8]) - if contact_to_remove: - contact_name = contact_to_remove.get('adv_name', contact_to_remove.get('name', 'Unknown')) - contact_key = public_key - self.logger.debug(f"Found contact using key prefix: {contact_name}") - except Exception as e: - self.logger.debug(f"Key prefix lookup failed: {e}") - - # Method 2: Fallback to manual search + # meshcore.contacts is keyed by public_key hex string + contact_to_remove = self.bot.meshcore.contacts.get(public_key) if not contact_to_remove: - for key, contact_data in self.bot.meshcore.contacts.items(): - if contact_data.get('public_key', key) == public_key: - contact_to_remove = contact_data - contact_name = contact_data.get('adv_name', contact_data.get('name', 'Unknown')) - contact_key = key - self.logger.debug(f"Found contact manually: {contact_name} (key: {contact_key})") - break - + contact_to_remove = self.bot.meshcore.get_contact_by_key_prefix(public_key[:8]) + if not contact_to_remove: self.logger.warning(f"Repeater with public key {public_key} not found in current contacts") return False - + + contact_name = contact_to_remove.get('adv_name', contact_to_remove.get('name', 'Unknown')) + self.logger.debug(f"Found contact: {contact_name}") + # Check if repeater exists in database, if not add it first existing_repeater = self.db_manager.execute_query( 'SELECT id FROM repeater_contacts WHERE public_key = ?', (public_key,) ) - + if not existing_repeater: - # Add repeater to database first - contact_name = contact_to_remove.get('adv_name', contact_to_remove.get('name', 'Unknown')) device_type = 'Repeater' if contact_to_remove.get('type') == 3: device_type = 'RoomServer' elif 'room' in contact_name.lower() or 'server' in contact_name.lower(): device_type = 'RoomServer' - + self.db_manager.execute_update(''' - INSERT INTO repeater_contacts + INSERT INTO repeater_contacts (public_key, name, device_type, contact_data) VALUES (?, ?, ?, ?) ''', ( @@ -2161,118 +2139,58 @@ class RepeaterManager: device_type, json.dumps(contact_to_remove) )) - + self.logger.info(f"Added repeater {contact_name} to database before purging") - - # Track whether device removal was successful - device_removal_successful = False - - # Verify contact still exists before attempting removal - contact_still_exists = any( - contact_data.get('public_key', key) == public_key - for key, contact_data in self.bot.meshcore.contacts.items() - ) - - if not contact_still_exists: + + # Check if contact is already gone from device + if public_key not in self.bot.meshcore.contacts: self.logger.info(f"✅ Contact '{contact_name}' not found in device contacts (already removed) - treating as success") device_removal_successful = True else: # Remove the contact using the proper MeshCore API + device_removal_successful = False + self.logger.info(f"Removing contact '{contact_name}' from device using MeshCore API...") try: - self.logger.info(f"Removing contact '{contact_name}' from device using MeshCore API...") - self.logger.debug(f"Contact details: public_key={public_key}, contact_key={contact_key}, name='{contact_name}'") - - # Try different key formats for removal - removal_keys_to_try = [] - - # Add the public key if it's different from contact key - if public_key != contact_key: - removal_keys_to_try.append(public_key) - - # Add the contact key - if contact_key: - removal_keys_to_try.append(contact_key) - - # Add the public key as bytes if it's a hex string - try: - if len(public_key) == 64: # 32 bytes in hex - public_key_bytes = bytes.fromhex(public_key) - removal_keys_to_try.append(public_key_bytes) - except: - pass - - self.logger.debug(f"Will try removal with keys: {removal_keys_to_try}") - - # Try each key format - for key_to_try in removal_keys_to_try: - try: - self.logger.debug(f"Trying removal with key: {key_to_try} (type: {type(key_to_try)})") - result = await asyncio.wait_for( - self.bot.meshcore.commands.remove_contact(key_to_try), - timeout=30.0 - ) - - # Check if removal was successful - if result.type == EventType.OK: - device_removal_successful = True - self.logger.info(f"✅ Successfully removed contact '{contact_name}' from device using key: {key_to_try}") - break - elif result.type == EventType.ERROR: - # Log detailed error information - error_code = result.payload.get('error_code', 'unknown') if hasattr(result, 'payload') else 'unknown' - self.logger.debug(f"❌ Removal failed with key {key_to_try}: {result}") - self.logger.debug(f"❌ Error type: {result.type}, Error code: {error_code}") - - # Error code 2 typically means "contact not found" - treat as success - if error_code == 2: - self.logger.info(f"✅ Contact '{contact_name}' not found (already removed) - treating as success") - device_removal_successful = True - break - else: - # Log other error types - error_code = result.payload.get('error_code', 'unknown') if hasattr(result, 'payload') else 'unknown' - self.logger.debug(f"❌ Removal failed with key {key_to_try}: {result}") - self.logger.debug(f"❌ Error type: {result.type}, Error code: {error_code}") - except Exception as e: - self.logger.debug(f"Exception with key {key_to_try}: {e}") - continue - - # If all key formats failed, try fallback methods - if not device_removal_successful: - self.logger.warning(f"All key formats failed for '{contact_name}' - trying fallback methods...") - device_removal_successful = await self._try_fallback_removal_methods(public_key, contact_name, reason) - + result = await self.bot.meshcore.commands.remove_contact(public_key) + + if result.type == EventType.OK: + device_removal_successful = True + self.logger.info(f"✅ Successfully removed contact '{contact_name}' from device") + elif result.type == EventType.ERROR: + error_code = result.payload.get('error_code') + reason_str = result.payload.get('reason') + if error_code == 2: + # Device says contact not found - treat as success + self.logger.info(f"✅ Contact '{contact_name}' not found on device (already removed) - treating as success") + device_removal_successful = True + else: + self.logger.error(f"❌ remove_contact failed for '{contact_name}': device_error_code={error_code}, lib_reason={reason_str}, payload={result.payload}") except Exception as e: - self.logger.error(f"Failed to remove contact '{contact_name}' from device: {e}") - # Try fallback methods on exception - self.logger.info(f"Attempting fallback methods for contact '{contact_name}' due to exception...") - device_removal_successful = await self._try_fallback_removal_methods(public_key, contact_name, reason) - + self.logger.error(f"❌ Exception calling remove_contact for '{contact_name}': {type(e).__name__}: {e}") + # Only mark as inactive in database if device removal was successful if device_removal_successful: self.db_manager.execute_update( 'UPDATE repeater_contacts SET is_active = 0, purge_count = purge_count + 1 WHERE public_key = ?', (public_key,) ) - + # Log the purge action self.db_manager.execute_update(''' INSERT INTO purging_log (action, public_key, name, reason) VALUES ('purged', ?, ?, ?) ''', (public_key, contact_name, reason)) - + self.logger.info(f"Successfully purged repeater {contact_name}: {reason}") - self.logger.debug(f"Purge process completed successfully for {contact_name}") return True else: self.logger.error(f"Failed to remove repeater {contact_name} from device - not marking as purged in database") - # Log the failed attempt self.db_manager.execute_update(''' INSERT INTO purging_log (action, public_key, name, reason) VALUES ('purge_failed', ?, ?, ?) ''', (public_key, contact_name, f"{reason} - Device removal failed")) return False - + except Exception as e: self.logger.error(f"Error purging repeater {public_key}: {e}") self.logger.debug(f"Error type: {type(e).__name__}") @@ -2282,369 +2200,125 @@ class RepeaterManager: """Remove a companion contact from the device's contact list""" self.logger.info(f"Starting companion purge process for public_key: {public_key}") self.logger.debug(f"Purge reason: {reason}") - + try: # Safety check: Never purge ACL members if self._is_in_acl(public_key): self.logger.warning(f"❌ Attempted to purge companion in ACL - BLOCKED: {public_key[:16]}...") return False - - # Find the contact in meshcore - contact_to_remove = None - contact_name = None - contact_key = None - - self.logger.debug(f"Searching through {len(self.bot.meshcore.contacts)} contacts...") - - # Try to find contact using MeshCore helper methods first - try: - contact_to_remove = self.bot.meshcore.get_contact_by_key_prefix(public_key[:8]) - if contact_to_remove: - contact_name = contact_to_remove.get('adv_name', contact_to_remove.get('name', 'Unknown')) - contact_key = public_key - self.logger.debug(f"Found contact using key prefix: {contact_name}") - except Exception as e: - self.logger.debug(f"Key prefix lookup failed: {e}") - - # Fallback to manual search + + # meshcore.contacts is keyed by public_key hex string + contact_to_remove = self.bot.meshcore.contacts.get(public_key) if not contact_to_remove: - for key, contact_data in self.bot.meshcore.contacts.items(): - if contact_data.get('public_key', key) == public_key: - # Verify it's a companion - if not self._is_companion_device(contact_data): - self.logger.warning(f"Contact {public_key} is not a companion - skipping") - return False - contact_to_remove = contact_data - contact_name = contact_data.get('adv_name', contact_data.get('name', 'Unknown')) - contact_key = key - self.logger.debug(f"Found companion manually: {contact_name} (key: {contact_key})") - break - + contact_to_remove = self.bot.meshcore.get_contact_by_key_prefix(public_key[:8]) + if not contact_to_remove: self.logger.warning(f"Companion with public key {public_key} not found in current contacts") return False - - # Track whether device removal was successful - device_removal_successful = False - - # Verify contact still exists before attempting removal - contact_still_exists = any( - contact_data.get('public_key', key) == public_key - for key, contact_data in self.bot.meshcore.contacts.items() - ) - - if not contact_still_exists: + + if not self._is_companion_device(contact_to_remove): + self.logger.warning(f"Contact {public_key} is not a companion - skipping") + return False + + contact_name = contact_to_remove.get('adv_name', contact_to_remove.get('name', 'Unknown')) + self.logger.debug(f"Found companion: {contact_name}") + + # Check if contact is already gone from device + if public_key not in self.bot.meshcore.contacts: self.logger.info(f"✅ Contact '{contact_name}' not found in device contacts (already removed) - treating as success") device_removal_successful = True else: - # Remove the contact using the MeshCore API: meshcore.commands.remove_contact(key) + # Remove the contact using the proper MeshCore API + device_removal_successful = False + self.logger.info(f"Removing companion '{contact_name}' from device using MeshCore API...") try: - self.logger.info(f"Removing companion '{contact_name}' from device using meshcore.commands.remove_contact()...") - self.logger.debug(f"Contact details: public_key={public_key}, contact_key={contact_key}, name='{contact_name}'") - - # Check if the commands.remove_contact method exists - if not hasattr(self.bot.meshcore, 'commands') or not hasattr(self.bot.meshcore.commands, 'remove_contact'): - self.logger.error(f"❌ meshcore.commands.remove_contact() method not found on meshcore object") - device_removal_successful = False - else: - # Use the MeshCore 2.2+ API: meshcore.commands.remove_contact(key) - # remove_contact accepts: str (hex public key), bytes, or dict (contact object) - removal_keys_to_try = [] - - # Add public_key if it's a valid 64-char hex string - if public_key and len(public_key) == 64: - removal_keys_to_try.append(('public_key', public_key)) - - # Add contact_key if it's different and valid - if contact_key and contact_key != public_key and len(contact_key) == 64: - removal_keys_to_try.append(('contact_key', contact_key)) - - # Try each key format - for key_name, key_to_try in removal_keys_to_try: - try: - self.logger.debug(f"Calling meshcore.commands.remove_contact({key_name}='{key_to_try[:16]}...')") - result = await asyncio.wait_for( - self.bot.meshcore.commands.remove_contact(key_to_try), - timeout=30.0 - ) - - # Check if removal was successful (meshcore 2.2+ returns Event object) - if result.type == EventType.OK: - device_removal_successful = True - self.logger.info(f"✅ Successfully removed contact '{contact_name}' via meshcore 2.2+ API using {key_name}") - break - elif result.type == EventType.ERROR: - error_code = result.payload.get('error_code', 'unknown') if hasattr(result, 'payload') else 'unknown' - if error_code == 2: - # Contact not found (already removed) - treat as success - device_removal_successful = True - self.logger.info(f"✅ Contact '{contact_name}' not found (already removed) - treating as success") - break - else: - self.logger.debug(f"remove_contact({key_name}) returned error_code {error_code}, trying next key...") - continue - else: - self.logger.debug(f"remove_contact({key_name}) returned unexpected event type: {result.type}") - continue - - except Exception as e: - self.logger.debug(f"remove_contact({key_name}) failed: {type(e).__name__}: {e}, trying next key...") - continue - - if not removal_keys_to_try: - self.logger.error(f"❌ No valid key available for remove_contact") - device_removal_successful = False - elif not device_removal_successful: - self.logger.error(f"❌ All remove_contact attempts failed") - device_removal_successful = False - - except Exception as e: - self.logger.error(f"❌ Exception during contact removal: {type(e).__name__}: {e}") - device_removal_successful = False - - # Verify the contact was actually removed by checking contacts list - if device_removal_successful: - # First, manually remove from local cache since the API reported success - # This prevents false negatives if the device hasn't updated its list yet - if contact_key in self.bot.meshcore.contacts: - del self.bot.meshcore.contacts[contact_key] - self.logger.debug(f"Removed '{contact_name}' from local contacts cache") - - # Wait a moment for the device to process the removal - await asyncio.sleep(2.0) - - # Refresh contacts from device to get latest state - try: - # Use the proper meshcore API to refresh contacts - if hasattr(self.bot.meshcore, 'ensure_contacts'): - await self.bot.meshcore.ensure_contacts(follow=True) - elif hasattr(self.bot.meshcore.commands, 'get_contacts'): - await self.bot.meshcore.commands.get_contacts(timeout=10.0) + result = await self.bot.meshcore.commands.remove_contact(public_key) + + if result.type == EventType.OK: + device_removal_successful = True + self.logger.info(f"✅ Successfully removed companion '{contact_name}' from device") + elif result.type == EventType.ERROR: + error_code = result.payload.get('error_code') + reason_str = result.payload.get('reason') + if error_code == 2: + # Device says contact not found - treat as success + self.logger.info(f"✅ Companion '{contact_name}' not found on device (already removed) - treating as success") + device_removal_successful = True else: - # Fallback: use CLI to refresh - from meshcore_cli.meshcore_cli import next_cmd - await asyncio.wait_for( - next_cmd(self.bot.meshcore, ["contacts"]), - timeout=15.0 - ) + self.logger.error(f"❌ remove_contact failed for '{contact_name}': device_error_code={error_code}, lib_reason={reason_str}, payload={result.payload}") + except Exception as e: + self.logger.error(f"❌ Exception calling remove_contact for '{contact_name}': {type(e).__name__}: {e}") + + if device_removal_successful: + # Remove from local cache optimistically, then refresh from device + self.bot.meshcore.contacts.pop(public_key, None) + self.logger.debug(f"Removed '{contact_name}' from local contacts cache") + await asyncio.sleep(2.0) + try: + await self.bot.meshcore.commands.get_contacts() self.logger.debug(f"Refreshed contacts from device") except Exception as e: self.logger.debug(f"Could not refresh contacts from device: {e}") - - # Wait a bit more after refresh for events to process - await asyncio.sleep(1.0) - - # Check if contact still exists after refresh - contact_still_exists = any( - contact_data.get('public_key', key) == public_key - for key, contact_data in self.bot.meshcore.contacts.items() - ) - - if contact_still_exists: - self.logger.warning(f"⚠️ Removal reported success but contact '{contact_name}' still exists in contacts list after refresh") - # Don't mark as failed - the API said it succeeded, device might just be slow - # We'll trust the API response and mark as successful - self.logger.info(f"⚠️ Trusting API success response despite contact still in list (device may be slow to update)") - else: - self.logger.debug(f"✅ Verified: contact '{contact_name}' successfully removed from device") - + # Update tracking database if device removal was successful if device_removal_successful: - # Update complete_contact_tracking to mark as not currently tracked self.db_manager.execute_update( 'UPDATE complete_contact_tracking SET is_currently_tracked = 0 WHERE public_key = ?', (public_key,) ) - - # Log the purge action + self.db_manager.execute_update(''' INSERT INTO purging_log (action, public_key, name, reason) VALUES ('companion_purged', ?, ?, ?) ''', (public_key, contact_name, reason)) - + self.logger.info(f"✅ Successfully purged companion {contact_name}: {reason}") - self.logger.debug(f"Companion purge process completed successfully for {contact_name}") return True else: self.logger.error(f"Failed to remove companion {contact_name} from device - not marking as purged in database") - # Log the failed attempt self.db_manager.execute_update(''' INSERT INTO purging_log (action, public_key, name, reason) VALUES ('companion_purge_failed', ?, ?, ?) ''', (public_key, contact_name, f"{reason} - Device removal failed")) return False - + except Exception as e: self.logger.error(f"Error purging companion {public_key}: {e}") self.logger.debug(f"Error type: {type(e).__name__}") return False - - async def _try_fallback_removal_methods(self, public_key: str, contact_name: str, reason: str) -> bool: - """Try alternative methods to remove a contact when the primary MeshCore API fails""" - try: - self.logger.info(f"Trying fallback removal methods for '{contact_name}'...") - - # Method 1: Try direct removal from contacts dictionary - try: - self.logger.info(f"Fallback Method 1: Direct removal from contacts dictionary...") - contact_removed = False - for contact_key, contact_data in list(self.bot.meshcore.contacts.items()): - if contact_data.get('public_key', contact_key) == public_key: - del self.bot.meshcore.contacts[contact_key] - contact_removed = True - self.logger.info(f"✅ Successfully removed contact '{contact_name}' from contacts dictionary") - break - - if contact_removed: - # Verify removal worked - await asyncio.sleep(1) - contact_still_exists = any( - contact_data.get('public_key', key) == public_key - for key, contact_data in self.bot.meshcore.contacts.items() - ) - if not contact_still_exists: - return True - else: - self.logger.warning(f"Contact '{contact_name}' still exists after dictionary removal") - except Exception as e: - self.logger.debug(f"Fallback Method 1 failed: {e}") - - # Method 2: Try alternative meshcore-cli commands - try: - self.logger.info(f"Fallback Method 2: Alternative meshcore-cli commands...") - from meshcore_cli.meshcore_cli import next_cmd - - # Try different removal commands (using valid meshcore-cli commands) - alternative_commands = [ - ["remove_contact", public_key], - ["del_contact", public_key] - ] - - for cmd in alternative_commands: - try: - self.logger.info(f"Trying fallback command: {' '.join(cmd)}") - - result = await asyncio.wait_for( - next_cmd(self.bot.meshcore, cmd), - timeout=15.0 - ) - - if result is not None: - self.logger.debug(f"Fallback command {' '.join(cmd)} result: {result}") - - # Verify removal - await asyncio.sleep(1) - contact_still_exists = any( - contact_data.get('public_key', key) == public_key - for key, contact_data in self.bot.meshcore.contacts.items() - ) - - if not contact_still_exists: - self.logger.info(f"✅ Fallback command {' '.join(cmd)} succeeded") - return True - else: - self.logger.debug(f"Contact '{contact_name}' still exists after {' '.join(cmd)}") - except Exception as e: - self.logger.debug(f"Fallback command {' '.join(cmd)} failed: {e}") - continue - - except Exception as e: - self.logger.debug(f"Fallback Method 2 failed: {e}") - - # Method 3: Try using contact key instead of public key - try: - self.logger.info(f"Fallback Method 3: Using contact key...") - contact_key = None - for key, contact_data in self.bot.meshcore.contacts.items(): - if contact_data.get('public_key', key) == public_key: - contact_key = key - break - - if contact_key: - # Try both meshcore API and CLI commands with contact key - try: - # Try meshcore API with contact key - result = await asyncio.wait_for( - self.bot.meshcore.commands.remove_contact(contact_key), - timeout=15.0 - ) - - if result.type == EventType.OK: - self.logger.info(f"✅ Fallback Method 3 succeeded using contact key via API") - return True - elif result.type == EventType.ERROR: - error_code = result.payload.get('error_code', 'unknown') if hasattr(result, 'payload') else 'unknown' - if error_code == 2: - self.logger.info(f"✅ Contact not found (already removed) - treating as success") - return True - except Exception as e: - self.logger.debug(f"API removal with contact key failed: {e}") - - # Try CLI command with contact key - try: - from meshcore_cli.meshcore_cli import next_cmd - - result = await asyncio.wait_for( - next_cmd(self.bot.meshcore, ["remove_contact", contact_key]), - timeout=15.0 - ) - - if result is not None: - # Verify removal - await asyncio.sleep(1) - contact_still_exists = any( - contact_data.get('public_key', key) == public_key - for key, contact_data in self.bot.meshcore.contacts.items() - ) - - if not contact_still_exists: - self.logger.info(f"✅ Fallback Method 3 succeeded using contact key via CLI") - return True - except Exception as e: - self.logger.debug(f"CLI removal with contact key failed: {e}") - except Exception as e: - self.logger.debug(f"Fallback Method 3 failed: {e}") - - self.logger.warning(f"All fallback methods failed for '{contact_name}'") - return False - - except Exception as e: - self.logger.error(f"Error in fallback removal methods: {e}") - return False - + async def purge_repeater_by_contact_key(self, contact_key: str, reason: str = "Manual purge") -> bool: - """Remove a repeater using the contact key from the device's contact list""" + """Remove a repeater using the contact key (public_key hex) from the device's contact list""" self.logger.info(f"Starting purge process for contact_key: {contact_key}") self.logger.debug(f"Purge reason: {reason}") - + try: - # Find the contact in meshcore using the contact key - if contact_key not in self.bot.meshcore.contacts: + # meshcore.contacts is keyed by public_key hex - contact_key IS the public_key + contact_data = self.bot.meshcore.contacts.get(contact_key) + if not contact_data: self.logger.warning(f"Contact with key {contact_key} not found in current contacts") return False - - contact_data = self.bot.meshcore.contacts[contact_key] + contact_name = contact_data.get('adv_name', contact_data.get('name', 'Unknown')) public_key = contact_data.get('public_key', contact_key) - - self.logger.info(f"Found contact: {contact_name} (key: {contact_key}, public_key: {public_key[:16]}...)") - + + self.logger.info(f"Found contact: {contact_name} (public_key: {public_key[:16]}...)") + # Check if repeater exists in database, if not add it first existing_repeater = self.db_manager.execute_query( 'SELECT id FROM repeater_contacts WHERE public_key = ?', (public_key,) ) - + if not existing_repeater: - # Add repeater to database first device_type = 'Repeater' if contact_data.get('type') == 3: device_type = 'RoomServer' elif 'room' in contact_name.lower() or 'server' in contact_name.lower(): device_type = 'RoomServer' - + self.db_manager.execute_update(''' - INSERT INTO repeater_contacts + INSERT INTO repeater_contacts (public_key, name, device_type, contact_data) VALUES (?, ?, ?, ?) ''', ( @@ -2653,351 +2327,68 @@ class RepeaterManager: device_type, json.dumps(contact_data) )) - + self.logger.info(f"Added repeater {contact_name} to database before purging") - - # Track whether device removal was successful + + # Remove the contact using the proper MeshCore API device_removal_successful = False - - # Try multiple approaches to remove the contact + self.logger.info(f"Removing contact '{contact_name}' from device using MeshCore API...") try: - self.logger.info(f"Starting removal of contact '{contact_name}' from device...") - - # Method 1: Try direct removal from contacts dictionary - try: - self.logger.info(f"Method 1: Attempting direct removal from contacts dictionary...") - if contact_key in self.bot.meshcore.contacts: - del self.bot.meshcore.contacts[contact_key] - self.logger.info(f"Successfully removed contact '{contact_name}' from contacts dictionary") + result = await self.bot.meshcore.commands.remove_contact(public_key) + + if result.type == EventType.OK: + device_removal_successful = True + self.logger.info(f"✅ Successfully removed contact '{contact_name}' from device") + elif result.type == EventType.ERROR: + error_code = result.payload.get('error_code') + reason_str = result.payload.get('reason') + if error_code == 2: + # Device says contact not found - treat as success + self.logger.info(f"✅ Contact '{contact_name}' not found on device (already removed) - treating as success") device_removal_successful = True else: - self.logger.warning(f"Contact '{contact_name}' not found in contacts dictionary") - except Exception as e: - self.logger.warning(f"Direct removal failed: {e}") - - # Method 2: Try using meshcore commands if available (meshcore 2.2+ API) - if not device_removal_successful and hasattr(self.bot.meshcore, 'commands'): - try: - self.logger.info(f"Method 2: Attempting removal via meshcore 2.2+ API...") - # Check if there's a remove_contact method - if hasattr(self.bot.meshcore.commands, 'remove_contact'): - # Try different parameter combinations - # remove_contact accepts: str (hex public key), bytes, or dict (contact object) - removal_keys_to_try = [] - - # Add public_key if it's a valid 64-char hex string - if public_key and len(public_key) == 64: - removal_keys_to_try.append(('public_key', public_key)) - - # Add contact_data dict (meshcore 2.2+ supports dict with public_key field) - if contact_data and isinstance(contact_data, dict): - removal_keys_to_try.append(('contact_data', contact_data)) - - # Add contact_key if it's different and valid - if contact_key and contact_key != public_key and len(contact_key) == 64: - removal_keys_to_try.append(('contact_key', contact_key)) - - for key_name, key_to_try in removal_keys_to_try: - try: - self.logger.debug(f"Trying remove_contact with {key_name}...") - result = await asyncio.wait_for( - self.bot.meshcore.commands.remove_contact(key_to_try), - timeout=30.0 - ) - - # Check if removal was successful (meshcore 2.2+ returns Event object) - if result.type == EventType.OK: - device_removal_successful = True - self.logger.info(f"✅ Successfully removed contact '{contact_name}' via meshcore 2.2+ API using {key_name}") - break - elif result.type == EventType.ERROR: - error_code = result.payload.get('error_code', 'unknown') if hasattr(result, 'payload') else 'unknown' - # Error code 2 typically means "contact not found" - treat as success - if error_code == 2: - device_removal_successful = True - self.logger.info(f"✅ Contact '{contact_name}' not found (already removed) - treating as success") - break - else: - self.logger.debug(f"remove_contact({key_name}) returned error_code {error_code}, trying next key...") - continue - else: - self.logger.debug(f"remove_contact({key_name}) returned unexpected event type: {result.type}") - continue - - except Exception as e: - self.logger.debug(f"remove_contact({key_name}) failed: {type(e).__name__}: {e}, trying next key...") - continue - - if not device_removal_successful: - self.logger.warning(f"All meshcore 2.2+ API remove_contact attempts failed for '{contact_name}'") - else: - self.logger.info("No remove_contact method found in meshcore commands") - except Exception as e: - self.logger.warning(f"Meshcore commands removal failed: {e}") - - # Method 3: Try CLI as fallback - if not device_removal_successful: - try: - self.logger.info(f"Method 3: Attempting removal via CLI...") - import asyncio - import sys - import io - - # Use asyncio.wait_for to add timeout for LoRa communication - start_time = asyncio.get_event_loop().time() - - # Use the meshcore-cli API for device commands - from meshcore_cli.meshcore_cli import next_cmd - - # Capture stdout/stderr to catch "Unknown contact" messages - old_stdout = sys.stdout - old_stderr = sys.stderr - captured_output = io.StringIO() - captured_errors = io.StringIO() - - try: - sys.stdout = captured_output - sys.stderr = captured_errors - - # Try using the contact key instead of public key - result = await asyncio.wait_for( - next_cmd(self.bot.meshcore, ["remove_contact", contact_key]), - timeout=30.0 # 30 second timeout for LoRa communication - ) - finally: - sys.stdout = old_stdout - sys.stderr = old_stderr - - # Get captured output - stdout_content = captured_output.getvalue() - stderr_content = captured_errors.getvalue() - all_output = stdout_content + stderr_content - - end_time = asyncio.get_event_loop().time() - duration = end_time - start_time - self.logger.info(f"CLI remove command completed in {duration:.2f} seconds") - - # Check if removal was successful - self.logger.debug(f"CLI command result: {result}") - self.logger.debug(f"CLI captured output: {all_output}") - - # Check if the captured output indicates the contact was unknown (doesn't exist) - if "unknown contact" in all_output.lower(): - self.logger.warning(f"CLI: Contact '{contact_name}' was not found on device") - elif result is not None: - self.logger.info(f"CLI: Successfully removed contact '{contact_name}' from device") - device_removal_successful = True - else: - self.logger.warning(f"CLI: Contact removal command returned no result for '{contact_name}'") - - except Exception as e: - self.logger.warning(f"CLI removal failed: {e}") - - # Verify removal and ensure persistence - if device_removal_successful: - # First, manually remove from local cache since the API reported success - # This prevents false negatives if the device hasn't updated its list yet - if contact_key in self.bot.meshcore.contacts: - del self.bot.meshcore.contacts[contact_key] - self.logger.debug(f"Removed '{contact_name}' from local contacts cache") - - await asyncio.sleep(2.0) # Give device time to process and save - - # Try to refresh contacts from device to get latest state - try: - self.logger.debug("Refreshing contacts from device...") - # Use the proper meshcore API to refresh contacts - if hasattr(self.bot.meshcore, 'ensure_contacts'): - await self.bot.meshcore.ensure_contacts(follow=True) - elif hasattr(self.bot.meshcore.commands, 'get_contacts'): - await self.bot.meshcore.commands.get_contacts(timeout=10.0) - else: - # Fallback: use CLI to refresh - from meshcore_cli.meshcore_cli import next_cmd - await asyncio.wait_for( - next_cmd(self.bot.meshcore, ["contacts"]), - timeout=15.0 - ) - self.logger.debug("Contacts refreshed from device") - except Exception as e: - self.logger.debug(f"Could not refresh contacts from device: {e}") - - # Wait a bit more after refresh for events to process - await asyncio.sleep(1.0) - - # Check if contact still exists in the bot's memory after refresh - contact_still_exists = contact_key in self.bot.meshcore.contacts - - if contact_still_exists: - self.logger.warning(f"⚠️ Removal reported success but contact '{contact_name}' still exists in contacts list after refresh") - # Don't mark as failed - the API said it succeeded, device might just be slow - # We'll trust the API response and mark as successful - self.logger.info(f"⚠️ Trusting API success response despite contact still in list (device may be slow to update)") - else: - self.logger.info(f"✅ Verified: Contact '{contact_name}' successfully removed from device") - + self.logger.error(f"❌ remove_contact failed for '{contact_name}': device_error_code={error_code}, lib_reason={reason_str}, payload={result.payload}") except Exception as e: - self.logger.error(f"Failed to remove contact '{contact_name}' from device: {e}") - self.logger.debug(f"Error type: {type(e).__name__}") - device_removal_successful = False - + self.logger.error(f"❌ Exception calling remove_contact for '{contact_name}': {type(e).__name__}: {e}") + + if device_removal_successful: + # Remove from local cache, then refresh from device + self.bot.meshcore.contacts.pop(public_key, None) + self.logger.debug(f"Removed '{contact_name}' from local contacts cache") + await asyncio.sleep(2.0) + try: + await self.bot.meshcore.commands.get_contacts() + self.logger.debug("Contacts refreshed from device") + except Exception as e: + self.logger.debug(f"Could not refresh contacts from device: {e}") + # Only mark as inactive in database if device removal was successful if device_removal_successful: self.db_manager.execute_update( 'UPDATE repeater_contacts SET is_active = 0, purge_count = purge_count + 1 WHERE public_key = ?', (public_key,) ) - - # Log the purge action + self.db_manager.execute_update(''' INSERT INTO purging_log (action, public_key, name, reason) VALUES ('purged', ?, ?, ?) ''', (public_key, contact_name, reason)) - + self.logger.info(f"Successfully purged repeater {contact_name}: {reason}") - self.logger.debug(f"Purge process completed successfully for {contact_name}") return True else: self.logger.error(f"Failed to remove repeater {contact_name} from device - not marking as purged in database") - # Log the failed attempt self.db_manager.execute_update(''' INSERT INTO purging_log (action, public_key, name, reason) VALUES ('purge_failed', ?, ?, ?) ''', (public_key, contact_name, f"{reason} - Device removal failed")) return False - + except Exception as e: self.logger.error(f"Error purging repeater {contact_key}: {e}") self.logger.debug(f"Error type: {type(e).__name__}") return False - async def force_purge_repeater_from_contacts(self, public_key: str, reason: str = "Force purge") -> bool: - """Force remove a repeater from device contacts using multiple methods""" - self.logger.info(f"Starting FORCE purge process for public_key: {public_key}") - self.logger.debug(f"Force purge reason: {reason}") - - try: - # Find the contact in meshcore - contact_to_remove = None - contact_name = None - - for contact_key, contact_data in self.bot.meshcore.contacts.items(): - if contact_data.get('public_key', contact_key) == public_key: - contact_to_remove = contact_data - contact_name = contact_data.get('adv_name', contact_data.get('name', 'Unknown')) - break - - if not contact_to_remove: - self.logger.warning(f"Repeater with public key {public_key} not found in current contacts") - return False - - # Method 1: Try standard removal - self.logger.info(f"Method 1: Attempting standard removal for '{contact_name}'") - success = await self.purge_repeater_from_contacts(public_key, reason) - if success: - self.logger.info(f"Standard removal successful for '{contact_name}'") - return True - - # Method 2: Try alternative removal commands - self.logger.info(f"Method 2: Attempting alternative removal for '{contact_name}'") - try: - from meshcore_cli.meshcore_cli import next_cmd - - # Try different removal commands - alternative_commands = [ - ["delete_contact", public_key], - ["remove", public_key], - ["del", public_key], - ["clear_contact", public_key] - ] - - for cmd in alternative_commands: - try: - self.logger.info(f"Trying command: {' '.join(cmd)}") - - # Capture stdout/stderr to catch "Unknown contact" messages - import sys - import io - old_stdout = sys.stdout - old_stderr = sys.stderr - captured_output = io.StringIO() - captured_errors = io.StringIO() - - try: - sys.stdout = captured_output - sys.stderr = captured_errors - - result = await asyncio.wait_for( - next_cmd(self.bot.meshcore, cmd), - timeout=15.0 - ) - finally: - sys.stdout = old_stdout - sys.stderr = old_stderr - - # Get captured output - stdout_content = captured_output.getvalue() - stderr_content = captured_errors.getvalue() - all_output = stdout_content + stderr_content - - if result is not None: - self.logger.debug(f"Alternative command {' '.join(cmd)} result: {result}") - self.logger.debug(f"Captured output: {all_output}") - - # Check if the captured output indicates the contact was unknown (doesn't exist) - if "unknown contact" in all_output.lower(): - self.logger.warning(f"Contact '{contact_name}' was not found on device - this suggests the contact list is out of sync") - # Don't mark as successful - we need to actually remove contacts that exist - continue # Try next command - else: - self.logger.info(f"Alternative command {' '.join(cmd)} succeeded") - # Verify removal - await asyncio.sleep(1) - contact_still_exists = False - for check_key, check_data in self.bot.meshcore.contacts.items(): - if check_data.get('public_key', check_key) == public_key: - contact_still_exists = True - break - - if not contact_still_exists: - # Mark as purged in database - self.db_manager.execute_update( - 'UPDATE repeater_contacts SET is_active = 0, purge_count = purge_count + 1 WHERE public_key = ?', - (public_key,) - ) - - self.db_manager.execute_update(''' - INSERT INTO purging_log (action, public_key, name, reason) - VALUES ('force_purged', ?, ?, ?) - ''', (public_key, contact_name, f"{reason} - Alternative command: {' '.join(cmd)}")) - - self.logger.info(f"Force purge successful for '{contact_name}' using {' '.join(cmd)}") - return True - except Exception as e: - self.logger.debug(f"Alternative command {' '.join(cmd)} failed: {e}") - continue - - except Exception as e: - self.logger.error(f"Error with alternative removal methods: {e}") - - # Method 3: Mark as purged anyway and log the issue - self.logger.warning(f"All removal methods failed for '{contact_name}' - marking as purged anyway") - self.db_manager.execute_update( - 'UPDATE repeater_contacts SET is_active = 0, purge_count = purge_count + 1 WHERE public_key = ?', - (public_key,) - ) - - self.db_manager.execute_update(''' - INSERT INTO purging_log (action, public_key, name, reason) - VALUES ('force_purged_failed', ?, ?, ?) - ''', (public_key, contact_name, f"{reason} - All removal methods failed, marked as purged anyway")) - - return True - - except Exception as e: - self.logger.error(f"Error in force purge for repeater {public_key}: {e}") - return False - async def purge_old_repeaters(self, days_old: int = 30, reason: str = "Automatic purge - old contacts") -> int: """Purge repeaters that haven't been seen in specified days""" try: diff --git a/modules/web_viewer/app.py b/modules/web_viewer/app.py index cc15ff7..8684d5f 100644 --- a/modules/web_viewer/app.py +++ b/modules/web_viewer/app.py @@ -9,6 +9,7 @@ import json import time import configparser import logging +import subprocess import threading from datetime import datetime, timedelta, date from flask import Flask, render_template, jsonify, request, send_from_directory, make_response @@ -85,6 +86,9 @@ class BotDataViewer: use_db = bot_db self.db_path = str(resolve_path(use_db, self.bot_root)) + # Version info for footer (tag or branch/commit/date); computed once at startup + self._version_info = self._get_version_info() + # Setup template context processor for global template variables self._setup_template_context() @@ -148,8 +152,61 @@ class BotDataViewer: config.read(config_path) return config + def _get_version_info(self) -> Dict[str, Optional[str]]: + """Get version info for footer: tag if on a tag, else branch, commit hash and date. + Checks MESHCORE_BOT_VERSION env (Docker/build), then .version_info, then git. Never raises.""" + out = {"tag": None, "branch": None, "commit": None, "date": None} + # Docker / CI: version set at build time (e.g. ARG + ENV in Dockerfile) + env_version = os.environ.get("MESHCORE_BOT_VERSION", "").strip() + if env_version: + out["tag"] = env_version if env_version.startswith("v") else f"v{env_version}" + return out + version_file = self.bot_root / ".version_info" + try: + if version_file.is_file(): + with open(version_file, "r") as f: + data = json.load(f) + # Installer/tag installs write installer_version (often the tag name) + tag = data.get("installer_version") or data.get("tag") + if tag: + out["tag"] = tag if tag.startswith("v") else f"v{tag}" + return out + except (OSError, json.JSONDecodeError, KeyError): + pass + try: + def run(cmd: List[str]) -> Optional[str]: + args = ["git", "-C", str(self.bot_root)] + cmd + result = subprocess.run( + args, capture_output=True, text=True, timeout=5 + ) + if result.returncode != 0: + return None + return (result.stdout or "").strip() or None + + # Check if HEAD is a tag + tag = run(["describe", "--exact-match", "HEAD"]) + if tag: + out["tag"] = tag if tag.startswith("v") else f"v{tag}" + return out + branch = run(["rev-parse", "--abbrev-ref", "HEAD"]) + commit = run(["rev-parse", "--short", "HEAD"]) + date_raw = run(["show", "-s", "--format=%ci", "HEAD"]) + out["branch"] = branch or None + out["commit"] = commit or None + if date_raw: + try: + # %ci is ISO format; take date part only + out["date"] = date_raw.split("T")[0] + except IndexError: + out["date"] = date_raw + return out + except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError, OSError): + return out + def _setup_template_context(self): """Setup template context processor to inject global variables""" + version_info = self._version_info + @self.app.context_processor def inject_template_vars(): """Inject variables available to all templates. Never raises so templates always render.""" @@ -166,10 +223,15 @@ class BotDataViewer: bot_name = (self.config.get('Bot', 'bot_name', fallback='MeshCore Bot') or '').strip() or 'MeshCore Bot' except (configparser.NoSectionError, configparser.NoOptionError): bot_name = 'MeshCore Bot' - return dict(greeter_enabled=greeter_enabled, feed_manager_enabled=feed_manager_enabled, bot_name=bot_name) + return dict( + greeter_enabled=greeter_enabled, + feed_manager_enabled=feed_manager_enabled, + bot_name=bot_name, + version_info=version_info, + ) except Exception as e: self.logger.exception("Template context processor failed: %s", e) - return dict(greeter_enabled=False, feed_manager_enabled=False, bot_name='MeshCore Bot') + return dict(greeter_enabled=False, feed_manager_enabled=False, bot_name='MeshCore Bot', version_info=version_info) def _get_db_path(self): """Get the database path, falling back to [Bot] db_path if [Web_Viewer] db_path is unset""" diff --git a/modules/web_viewer/templates/base.html b/modules/web_viewer/templates/base.html index 45f95d1..523ab79 100644 --- a/modules/web_viewer/templates/base.html +++ b/modules/web_viewer/templates/base.html @@ -427,6 +427,15 @@
meshcore-bot + {% if version_info.tag %} + {{ version_info.tag }} + {% elif version_info.branch or version_info.commit or version_info.date %} + + {% if version_info.branch %}{{ version_info.branch }}{% endif %} + {% if version_info.commit %}{% if version_info.branch %} · {% endif %}{{ version_info.commit }}{% endif %} + {% if version_info.date %}{% if version_info.branch or version_info.commit %} · {% endif %}{{ version_info.date }}{% endif %} + + {% endif %} for MeshCore
From f5fedb52fd8f0a5c1436dc8123a85ba5a18f978e Mon Sep 17 00:00:00 2001 From: Adam Gessaman Date: Fri, 20 Feb 2026 13:41:55 -0800 Subject: [PATCH 06/10] Add MQTT upload packet types configuration to `config.ini.example` and update `PacketCaptureService` to handle packet type filtering - Introduced new configuration options for specifying packet types to upload in `config.ini.example`. - Enhanced `PacketCaptureService` to parse and apply these settings, allowing for selective packet type uploads based on user-defined criteria. - Improved logging to provide feedback when packets are skipped due to type filtering. --- config.ini.example | 3 +++ .../service_plugins/packet_capture_service.py | 21 +++++++++++++++++-- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/config.ini.example b/config.ini.example index 1b80872..0372282 100644 --- a/config.ini.example +++ b/config.ini.example @@ -1238,6 +1238,7 @@ mqtt_enabled = true # mqttN_topic_packets = # Packets topic template (uses placeholders below) # mqttN_topic_prefix = # Legacy topic prefix (fallback if topic_status/topic_packets not set) # mqttN_client_id = # MQTT client ID (optional, auto-generated from bot name) +# mqttN_upload_packet_types = # Comma-separated packet types to upload (e.g. 2,4); empty = all # # Topic template placeholders: # {IATA} - Uppercase IATA code (e.g., SEA) @@ -1257,6 +1258,7 @@ mqtt1_topic_status = meshcore/{IATA}/{PUBLIC_KEY}/status mqtt1_topic_packets = meshcore/{IATA}/{PUBLIC_KEY}/packets mqtt1_websocket_path = /mqtt mqtt1_client_id = +mqtt1_upload_packet_types = # MQTT Broker 2 - Let's Mesh Analyzer (EU) mqtt2_enabled = true @@ -1270,6 +1272,7 @@ mqtt2_topic_status = meshcore/{IATA}/{PUBLIC_KEY}/status mqtt2_topic_packets = meshcore/{IATA}/{PUBLIC_KEY}/packets mqtt2_websocket_path = /mqtt mqtt2_client_id = +mqtt2_upload_packet_types = # Stats and status publishing # Enable stats in status messages diff --git a/modules/service_plugins/packet_capture_service.py b/modules/service_plugins/packet_capture_service.py index 0478078..e5229a7 100644 --- a/modules/service_plugins/packet_capture_service.py +++ b/modules/service_plugins/packet_capture_service.py @@ -231,6 +231,14 @@ class PacketCaptureService(BaseServicePlugin): broker_num += 1 continue + # Parse upload_packet_types: comma-separated list (e.g. "2,4"); empty/unset = upload all + upload_types_raw = config.get('PacketCapture', f'mqtt{broker_num}_upload_packet_types', fallback='').strip() + upload_packet_types = None + if upload_types_raw: + upload_packet_types = frozenset(t.strip() for t in upload_types_raw.split(',') if t.strip()) + if not upload_packet_types: + upload_packet_types = None + broker = { 'enabled': True, 'host': config.get('PacketCapture', server_key, fallback='localhost'), @@ -245,7 +253,8 @@ class PacketCaptureService(BaseServicePlugin): 'transport': config.get('PacketCapture', f'mqtt{broker_num}_transport', fallback='tcp').lower(), 'use_tls': config.getboolean('PacketCapture', f'mqtt{broker_num}_use_tls', fallback=False), 'websocket_path': config.get('PacketCapture', f'mqtt{broker_num}_websocket_path', fallback='/mqtt'), - 'client_id': config.get('PacketCapture', f'mqtt{broker_num}_client_id', fallback=None) + 'client_id': config.get('PacketCapture', f'mqtt{broker_num}_client_id', fallback=None), + 'upload_packet_types': upload_packet_types, } # Set default topic_prefix if not set @@ -1309,7 +1318,15 @@ class PacketCaptureService(BaseServicePlugin): try: client = mqtt_client_info['client'] config = mqtt_client_info['config'] - + + # Per-broker packet type filter: if set, only upload listed types (e.g. 2,4 = TXT_MSG, ADVERT) + upload_types = config.get('upload_packet_types') + if upload_types is not None and packet_info.get('packet_type', '') not in upload_types: + self.logger.debug( + f"Skipping MQTT broker {config.get('host', 'unknown')} (packet type {packet_info.get('packet_type')} not in {sorted(upload_types)})" + ) + continue + # Determine topic topic = None if config.get('topic_packets'): From 54936a806e4fe22bf788956ad257488e3c7197fd Mon Sep 17 00:00:00 2001 From: agessaman Date: Fri, 20 Feb 2026 15:41:59 -0800 Subject: [PATCH 07/10] Refactor packet logging and enhance MQTT publishing metrics in `PacketCaptureService` - Updated packet logging to indicate whether packets are captured or skipped based on filtering criteria. - Modified the `publish_packet_mqtt` method to return additional metrics, including a flag for packets skipped due to type filtering. - Improved clarity in logging and metrics to aid in diagnosing packet handling behavior. --- modules/service_plugins/packet_capture_service.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/modules/service_plugins/packet_capture_service.py b/modules/service_plugins/packet_capture_service.py index e5229a7..ca3ea43 100644 --- a/modules/service_plugins/packet_capture_service.py +++ b/modules/service_plugins/packet_capture_service.py @@ -815,7 +815,8 @@ class PacketCaptureService(BaseServicePlugin): publish_metrics = await self.publish_packet_mqtt(formatted_packet) # Log DEBUG level for each packet (verbose; use INFO only for service lifecycle) - self.logger.debug(f"📦 Captured packet #{self.packet_count}: {formatted_packet['route']} type {formatted_packet['packet_type']}, {formatted_packet['len']} bytes, SNR: {formatted_packet['SNR']}, RSSI: {formatted_packet['RSSI']}, hash: {formatted_packet['hash']} (MQTT: {publish_metrics['succeeded']}/{publish_metrics['attempted']})") + action = "Skipping" if publish_metrics.get("skipped_by_filter") else "Captured" + self.logger.debug(f"📦 {action} packet #{self.packet_count}: {formatted_packet['route']} type {formatted_packet['packet_type']}, {formatted_packet['len']} bytes, SNR: {formatted_packet['SNR']}, RSSI: {formatted_packet['RSSI']}, hash: {formatted_packet['hash']} (MQTT: {publish_metrics['succeeded']}/{publish_metrics['attempted']})") # Output full packet data structure in debug mode only (matches original script) if self.debug: @@ -1286,20 +1287,21 @@ class PacketCaptureService(BaseServicePlugin): return topic - async def publish_packet_mqtt(self, packet_info: Dict[str, Any]) -> Dict[str, int]: - """Publish packet to MQTT - returns metrics dict with 'attempted' and 'succeeded' counts. + async def publish_packet_mqtt(self, packet_info: Dict[str, Any]) -> Dict[str, Any]: + """Publish packet to MQTT - returns metrics dict with attempted/succeeded/skipped_by_filter. Args: packet_info: Formatted packet dictionary. Returns: - Dict[str, int]: Dictionary with 'attempted' and 'succeeded' counts. + Dict with 'attempted', 'succeeded' counts and 'skipped_by_filter' (True when + packet type was excluded by mqttN_upload_packet_types for all connected brokers). """ # Always log when function is called (helps diagnose if it's not being invoked) self.logger.debug(f"publish_packet_mqtt called (packet {self.packet_count}, {len(self.mqtt_clients)} clients)") - # Initialize metrics - metrics = {"attempted": 0, "succeeded": 0} + # Initialize metrics (skipped_by_filter: True when packet type excluded by upload_packet_types) + metrics = {"attempted": 0, "succeeded": 0, "skipped_by_filter": False} # Check per-broker connection status (more accurate than global flag) # Don't use early return - let the loop check each broker individually @@ -1322,6 +1324,7 @@ class PacketCaptureService(BaseServicePlugin): # Per-broker packet type filter: if set, only upload listed types (e.g. 2,4 = TXT_MSG, ADVERT) upload_types = config.get('upload_packet_types') if upload_types is not None and packet_info.get('packet_type', '') not in upload_types: + metrics["skipped_by_filter"] = True self.logger.debug( f"Skipping MQTT broker {config.get('host', 'unknown')} (packet type {packet_info.get('packet_type')} not in {sorted(upload_types)})" ) From cb9d8255d7ceac17d6db27b851b19fadfed7038e Mon Sep 17 00:00:00 2001 From: agessaman Date: Fri, 20 Feb 2026 15:51:23 -0800 Subject: [PATCH 08/10] Enhance web viewer integration with circuit breaker logic and improved error handling - Introduced a circuit breaker mechanism to manage consecutive connection failures when sending updates to the web viewer, preventing log flooding during outages. - Added methods to track send attempts and determine when to skip sending based on the circuit breaker state. - Updated logging to provide feedback on circuit status and failures, improving troubleshooting capabilities. - Refined message handling to ensure efficient communication with the web viewer, reducing unnecessary requests during downtime. --- modules/message_handler.py | 5 +-- modules/web_viewer/integration.py | 58 +++++++++++++++++++++++-------- 2 files changed, 46 insertions(+), 17 deletions(-) diff --git a/modules/message_handler.py b/modules/message_handler.py index 9d7fb69..07cdf55 100644 --- a/modules/message_handler.py +++ b/modules/message_handler.py @@ -534,8 +534,9 @@ class MessageHandler: advert_data['out_path'] = out_path advert_data['out_path_len'] = out_path_len - # Update mesh graph with edges from the advert path - # Create edge from advertising device to first hop in path + # Update mesh graph with edges from the advert path (one edge per hop). + # This can trigger many send_mesh_edge_update() calls in quick succession; + # if the web viewer is down, that produces a wave of connection-refused logs. if (out_path and out_path_len > 0 and hasattr(self.bot, 'mesh_graph') and self.bot.mesh_graph and self.bot.mesh_graph.capture_enabled): diff --git a/modules/web_viewer/integration.py b/modules/web_viewer/integration.py index 4964c18..30cb2a4 100644 --- a/modules/web_viewer/integration.py +++ b/modules/web_viewer/integration.py @@ -17,10 +17,15 @@ from ..utils import resolve_path class BotIntegration: """Simple bot integration for web viewer compatibility""" + # After this many consecutive connection failures, stop sending until cooldown expires + CIRCUIT_BREAKER_THRESHOLD = 3 + CIRCUIT_BREAKER_COOLDOWN_SEC = 60 + def __init__(self, bot): self.bot = bot self.circuit_breaker_open = False self.circuit_breaker_failures = 0 + self.circuit_breaker_last_failure_time = 0.0 self.is_shutting_down = False # Initialize HTTP session with connection pooling for efficient reuse self._init_http_session() @@ -36,11 +41,10 @@ class BotIntegration: import urllib3 import logging - # Suppress urllib3 connection pool debug messages - # "Resetting dropped connection" is expected behavior when connections are idle - # and the connection pool is working correctly + # Suppress urllib3 connection pool messages when web viewer is unreachable + # Connection refused / Retrying WARNINGs would flood logs during routing bursts urllib3_logger = logging.getLogger('urllib3.connectionpool') - urllib3_logger.setLevel(logging.INFO) # Suppress DEBUG messages + urllib3_logger.setLevel(logging.ERROR) # Also disable other urllib3 warnings urllib3.disable_warnings(urllib3.exceptions.NotOpenSSLWarning) @@ -80,6 +84,30 @@ class BotIntegration: """Reset the circuit breaker""" self.circuit_breaker_open = False self.circuit_breaker_failures = 0 + + def _should_skip_web_viewer_send(self): + """Return True if we should skip sending (circuit open and within cooldown).""" + if not self.circuit_breaker_open: + return False + if (time.time() - self.circuit_breaker_last_failure_time) >= self.CIRCUIT_BREAKER_COOLDOWN_SEC: + self.reset_circuit_breaker() + return False + return True + + def _record_web_viewer_result(self, success): + """Update circuit breaker state after a send attempt.""" + if success: + self.reset_circuit_breaker() + else: + self.circuit_breaker_failures += 1 + self.circuit_breaker_last_failure_time = time.time() + if self.circuit_breaker_failures >= self.CIRCUIT_BREAKER_THRESHOLD: + self.circuit_breaker_open = True + self.bot.logger.debug( + "Web viewer unreachable after %d failures; circuit open for %ds", + self.circuit_breaker_failures, + self.CIRCUIT_BREAKER_COOLDOWN_SEC, + ) def _get_web_viewer_db_path(self): """Return resolved database path for web viewer. Uses [Bot] db_path when [Web_Viewer] db_path is unset.""" @@ -325,6 +353,8 @@ class BotIntegration: def send_mesh_edge_update(self, edge_data): """Send mesh edge update to web viewer via HTTP API""" try: + if self._should_skip_web_viewer_send(): + return # Get web viewer URL from config host = self.bot.config.get('Web_Viewer', 'host', fallback='127.0.0.1') port = self.bot.config.getint('Web_Viewer', 'port', fallback=8080) @@ -338,28 +368,27 @@ class BotIntegration: # Use session with connection pooling if available, otherwise fallback to requests.post if self.http_session: try: - # Use a slightly longer timeout to allow connection reuse self.http_session.post(url, json=payload, timeout=1.0) + self._record_web_viewer_result(True) except Exception: - # Silently fail - web viewer might not be running - pass + self._record_web_viewer_result(False) else: - # Fallback if session not initialized import requests try: requests.post(url, json=payload, timeout=1.0) + self._record_web_viewer_result(True) except Exception: - pass + self._record_web_viewer_result(False) except Exception as e: self.bot.logger.debug(f"Error sending mesh edge update to web viewer: {e}") def send_mesh_node_update(self, node_data): """Send mesh node update to web viewer via HTTP API""" try: + if self._should_skip_web_viewer_send(): + return import requests - import json - - # Get web viewer URL from config + host = self.bot.config.get('Web_Viewer', 'host', fallback='127.0.0.1') port = self.bot.config.getint('Web_Viewer', 'port', fallback=8080) url = f"http://{host}:{port}/api/stream_data" @@ -369,12 +398,11 @@ class BotIntegration: 'data': node_data } - # Send asynchronously (don't block) try: requests.post(url, json=payload, timeout=0.5) + self._record_web_viewer_result(True) except Exception: - # Silently fail - web viewer might not be running - pass + self._record_web_viewer_result(False) except Exception as e: self.bot.logger.debug(f"Error sending mesh node update to web viewer: {e}") From 5d79f41b4c0f6c73b7e3870e30112f2ce06b0ca5 Mon Sep 17 00:00:00 2001 From: agessaman Date: Sat, 21 Feb 2026 09:10:54 -0800 Subject: [PATCH 09/10] fix footer version formatting --- modules/web_viewer/app.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/web_viewer/app.py b/modules/web_viewer/app.py index 8684d5f..59fd497 100644 --- a/modules/web_viewer/app.py +++ b/modules/web_viewer/app.py @@ -195,8 +195,8 @@ class BotDataViewer: out["commit"] = commit or None if date_raw: try: - # %ci is ISO format; take date part only - out["date"] = date_raw.split("T")[0] + # %ci is "YYYY-MM-DD HH:MM:SS +tz"; take date part only + out["date"] = date_raw.split()[0] except IndexError: out["date"] = date_raw return out From 99f4ed32d95b90f9960346e19cf0e14a3f39e736 Mon Sep 17 00:00:00 2001 From: agessaman Date: Sat, 21 Feb 2026 09:34:29 -0800 Subject: [PATCH 10/10] Add packet type filtering documentation to `packet-capture.md` --- docs/packet-capture.md | 34 +++++++++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/docs/packet-capture.md b/docs/packet-capture.md index 2e2dea1..50293a4 100644 --- a/docs/packet-capture.md +++ b/docs/packet-capture.md @@ -85,6 +85,37 @@ mqtt2_username = user mqtt2_password = pass ``` +#### Filtering by packet type + +You can limit which packet types are uploaded to each broker with `mqttN_upload_packet_types`. Use a comma-separated list of type numbers; if unset or empty, all packet types are uploaded. + +```ini +# Only upload text messages and adverts to this broker +mqtt1_upload_packet_types = 2, 4 + +# Broker 2 gets everything (default) +# mqtt2_upload_packet_types = +``` + +**Packet type reference:** + +| Type | Name | Description | +|------|------------|--------------------| +| 0 | REQ | Request | +| 1 | RESPONSE | Response | +| 2 | TXT_MSG | Text message | +| 3 | ACK | Acknowledgment | +| 4 | ADVERT | Advertisement | +| 5 | GRP_TXT | Group text | +| 6 | GRP_DATA | Group data | +| 7 | ANON_REQ | Anonymous request | +| 8 | PATH | Path | +| 9 | TRACE | Trace | +| 10 | MULTIPART | Multipart | +| 11–15| Type11–RAW_CUSTOM | Other types | + +Packets that are excluded by this filter are still written to the output file (if configured) and still counted; they are only skipped for MQTT upload to that broker. Debug logs will show "Skipping" for those packets. + ### Topic Templates Placeholders: @@ -170,8 +201,9 @@ Common issues: ### No Packets Being Published 1. **Verify MQTT connection** - Check logs for "Connected to MQTT broker" -2. **Check packet count** - Service logs "Captured packet #N" for each packet +2. **Check packet count** - Service logs "Captured packet #N" (or "Skipping packet #N" when filtered) for each packet 3. **Verify topics** - Ensure topics match broker expectations +4. **Check upload filter** - If `mqttN_upload_packet_types` is set, only those types are uploaded. DEBUG Logs show "packet type X not in [Y, Z]" when a packet is skipped ---