#!/usr/bin/env python3 """ Packet Capture Service for MeshCore Bot Captures packets from MeshCore radios and outputs to console, file, and MQTT. Adapted from meshcore-packet-capture project. """ import asyncio import copy import json import logging import os import time from datetime import datetime from typing import Any, Optional # Import meshcore from meshcore import EventType # Import bot's enums from ..enums import PayloadType, PayloadVersion, RouteType # Import bot's utilities for packet hash from ..utils import calculate_packet_hash, decode_path_len_byte, parse_trace_payload_route_hashes # Import MQTT client try: import paho.mqtt.client as mqtt except ImportError: mqtt = None # Import auth token utilities # Import base service import contextlib from .base_service import BaseServicePlugin from .packet_capture_utils import create_auth_token_async, read_private_key_file class PacketCaptureService(BaseServicePlugin): """Packet capture service using bot's meshcore connection. Captures packets from MeshCore network and publishes to MQTT. Supports multiple MQTT brokers, auth tokens, and output to file. """ config_section = 'PacketCapture' # Explicit config section description = "Captures packets from MeshCore network and publishes to MQTT" def __init__(self, bot): """Initialize packet capture service. Args: bot: The bot instance. """ super().__init__(bot) # Don't store meshcore here - it's None until bot connects # Use self.meshcore property to get current connection # Setup logging (use bot's formatter and configuration) self.logger = logging.getLogger('PacketCaptureService') self.logger.setLevel(bot.logger.level) # Only setup handlers if none exist to prevent duplicates if not self.logger.handlers: # Use the same formatter as the bot (colored if enabled) # Get formatter from bot's console handler bot_formatter = None for handler in bot.logger.handlers: if isinstance(handler, logging.StreamHandler) and not isinstance(handler, logging.FileHandler): bot_formatter = handler.formatter break # If no formatter found, create one matching bot's style if not bot_formatter: try: import colorlog colored = (bot.config.getboolean('Logging', 'colored_output', fallback=True) if bot.config.has_section('Logging') else True) if colored: bot_formatter = colorlog.ColoredFormatter( '%(log_color)s%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S', log_colors={ 'DEBUG': 'cyan', 'INFO': 'green', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'red,bg_white', } ) else: bot_formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) except ImportError: # Fallback if colorlog not available bot_formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # Add console handler with bot's formatter console_handler = logging.StreamHandler() console_handler.setFormatter(bot_formatter) self.logger.addHandler(console_handler) # Prevent propagation to root logger to avoid duplicate output self.logger.propagate = False # Load configuration from bot's config self._load_config() # Connection state (uses bot's connection, but track our state) self.connected = False # Packet tracking self.packet_count = 0 self.output_handle = None # MQTT self.mqtt_clients: list[dict[str, Any]] = [] self.mqtt_connected = False # Stats/status publishing self.stats_status_enabled = self.get_config_bool('stats_in_status_enabled', True) self.stats_refresh_interval = self.get_config_int('stats_refresh_interval', 300) self.latest_stats = None self.last_stats_fetch = 0 self.stats_supported = False self.stats_capability_state = None self.stats_update_task = None self.stats_fetch_lock = asyncio.Lock() self.cached_firmware_info = None self.radio_info = None # Background tasks self.background_tasks: list[asyncio.Task] = [] self.should_exit = False # JWT renewal (default: 12 hours, tokens valid for 24 hours) self.jwt_renewal_interval = self.get_config_int('jwt_renewal_interval', 43200) # Health check self.health_check_interval = self.get_config_int('health_check_interval', 30) self.health_check_grace_period = self.get_config_int('health_check_grace_period', 2) self.health_check_failure_count = 0 # Event subscriptions (track for cleanup) self.event_subscriptions = [] self.logger.info("Packet capture service initialized") def _load_config(self) -> None: """Load configuration from bot's config. Loads settings for output file, MQTT brokers, auth tokens, and other service options. """ config = self.bot.config # Check if enabled self.enabled = config.getboolean('PacketCapture', 'enabled', fallback=False) # Output file self.output_file = config.get('PacketCapture', 'output_file', fallback=None) # Verbose/debug self.verbose = config.getboolean('PacketCapture', 'verbose', fallback=False) self.debug = config.getboolean('PacketCapture', 'debug', fallback=False) # MQTT configuration self.mqtt_enabled = config.getboolean('PacketCapture', 'mqtt_enabled', fallback=True) self.mqtt_brokers = self._parse_mqtt_brokers(config) # Global IATA self.global_iata = config.get('PacketCapture', 'iata', fallback='XYZ').lower() # Owner information self.owner_public_key = config.get('PacketCapture', 'owner_public_key', fallback=None) self.owner_email = config.get('PacketCapture', 'owner_email', fallback=None) # Private key for auth tokens (fallback if device signing not available) self.private_key_path = config.get('PacketCapture', 'private_key_path', fallback=None) self.private_key_hex = None if self.private_key_path: self.private_key_hex = read_private_key_file(self.private_key_path) if not self.private_key_hex: self.logger.warning(f"Could not load private key from {self.private_key_path}") # Auth token method preference self.auth_token_method = config.get('PacketCapture', 'auth_token_method', fallback='device').lower() # 'device' = try on-device signing first, fallback to Python # 'python' = use Python signing only # Note: Python signing can fetch private key from device if not provided via file # The create_auth_token_async function will automatically try to export the key # from the device if private_key_hex is None and meshcore_instance is available def _parse_mqtt_brokers(self, config) -> list[dict[str, Any]]: """Parse MQTT broker configuration (mqttN_* format). Args: config: ConfigParser object containing the configuration. Returns: List[Dict[str, Any]]: List of configured MQTT broker dictionaries. """ brokers = [] # Parse multiple brokers (mqtt1_*, mqtt2_*, etc.) broker_num = 1 while True: enabled_key = f'mqtt{broker_num}_enabled' server_key = f'mqtt{broker_num}_server' if not config.has_option('PacketCapture', server_key): break enabled = config.getboolean('PacketCapture', enabled_key, fallback=True) if not enabled: broker_num += 1 continue # Parse upload_packet_types: comma-separated list (e.g. "2,4"); empty/unset = upload all upload_types_raw = config.get('PacketCapture', f'mqtt{broker_num}_upload_packet_types', fallback='').strip() upload_packet_types = None if upload_types_raw: upload_packet_types = frozenset(t.strip() for t in upload_types_raw.split(',') if t.strip()) if not upload_packet_types: upload_packet_types = None broker = { 'enabled': True, 'host': config.get('PacketCapture', server_key, fallback='localhost'), 'port': config.getint('PacketCapture', f'mqtt{broker_num}_port', fallback=1883), 'username': config.get('PacketCapture', f'mqtt{broker_num}_username', fallback=None), 'password': config.get('PacketCapture', f'mqtt{broker_num}_password', fallback=None), 'topic_prefix': config.get('PacketCapture', f'mqtt{broker_num}_topic_prefix', fallback=None), 'topic_status': config.get('PacketCapture', f'mqtt{broker_num}_topic_status', fallback=None), 'topic_packets': config.get('PacketCapture', f'mqtt{broker_num}_topic_packets', fallback=None), 'use_auth_token': config.getboolean('PacketCapture', f'mqtt{broker_num}_use_auth_token', fallback=False), 'token_audience': config.get('PacketCapture', f'mqtt{broker_num}_token_audience', fallback=None), 'transport': config.get('PacketCapture', f'mqtt{broker_num}_transport', fallback='tcp').lower(), 'use_tls': config.getboolean('PacketCapture', f'mqtt{broker_num}_use_tls', fallback=False), 'websocket_path': config.get('PacketCapture', f'mqtt{broker_num}_websocket_path', fallback='/mqtt'), 'client_id': config.get('PacketCapture', f'mqtt{broker_num}_client_id', fallback=None), 'upload_packet_types': upload_packet_types, } # Set default topic_prefix if not set if not broker['topic_prefix']: broker['topic_prefix'] = 'meshcore/packets' brokers.append(broker) broker_num += 1 return brokers def get_config_bool(self, key: str, fallback: bool = False) -> bool: """Get boolean config value. Args: key: Config key to retrieve. fallback: Default value if key is missing. Returns: bool: Config value or fallback. """ return self.bot.config.getboolean('PacketCapture', key, fallback=fallback) def get_config_int(self, key: str, fallback: int = 0) -> int: """Get integer config value. Args: key: Config key to retrieve. fallback: Default value if key is missing. Returns: int: Config value or fallback. """ return self.bot.config.getint('PacketCapture', key, fallback=fallback) def get_config_float(self, key: str, fallback: float = 0.0) -> float: """Get float config value. Args: key: Config key to retrieve. fallback: Default value if key is missing. Returns: float: Config value or fallback. """ return self.bot.config.getfloat('PacketCapture', key, fallback=fallback) def get_config_str(self, key: str, fallback: str = '') -> str: """Get string config value. Args: key: Config key to retrieve. fallback: Default value if key is missing. Returns: str: Config value or fallback. """ return self.bot.config.get('PacketCapture', key, fallback=fallback) @property def meshcore(self): """Get meshcore connection from bot (always current). Returns: MeshCore: The meshcore instance from the bot. """ return self.bot.meshcore if self.bot else None def is_healthy(self) -> bool: return ( self._running and bool(self.meshcore and self.meshcore.is_connected) ) async def start(self) -> None: """Start the packet capture service. Initializes output file, MQTT connections, and event handlers. Waits for bot connection before starting. """ if not self.enabled: self.logger.info("Packet capture service is disabled") return # Wait for bot to be connected (with timeout) max_wait = 30 # seconds wait_time: float = 0 while (not self.bot.connected or not self.meshcore) and wait_time < max_wait: await asyncio.sleep(0.5) wait_time += 0.5 if not self.bot.connected or not self.meshcore: self.logger.warning("Bot not connected after waiting, cannot start packet capture") return self.logger.info("Starting packet capture service...") # Open output file if specified if self.output_file: try: self.output_handle = open(self.output_file, 'a') self.logger.info(f"Writing packets to: {self.output_file}") except Exception as e: self.logger.error(f"Failed to open output file: {e}") # Setup event handlers await self.setup_event_handlers() # Connect to MQTT brokers if self.mqtt_enabled and self._require_mqtt(): await self.connect_mqtt_brokers() # Give MQTT connections a moment to establish await asyncio.sleep(2) if self.mqtt_connected: self.logger.info(f"MQTT connected to {len(self.mqtt_clients)} broker(s)") else: self.logger.warning("MQTT enabled but no brokers connected") # Start background tasks await self.start_background_tasks() self.connected = True self._running = True self.logger.info(f"Packet capture service started (MQTT: {'connected' if self.mqtt_connected else 'not connected'})") async def stop(self) -> None: """Stop the packet capture service. Closes output file, disconnects MQTT, and stops background tasks. """ self.logger.info("Stopping packet capture service...") self.should_exit = True self._running = False self.connected = False # Cancel background tasks for task in self.background_tasks: if not task.done(): task.cancel() with contextlib.suppress(asyncio.CancelledError): await task # Clean up event subscriptions self.cleanup_event_subscriptions() # Disconnect MQTT for mqtt_client_info in self.mqtt_clients: try: mqtt_client_info['client'].disconnect() mqtt_client_info['client'].loop_stop() except (AttributeError, RuntimeError, OSError) as e: # Silently ignore expected errors during cleanup (client already disconnected, etc.) self.logger.debug(f"Error disconnecting MQTT client during cleanup: {e}") except Exception as e: # Log unexpected errors but don't fail cleanup self.logger.warning(f"Unexpected error disconnecting MQTT client: {e}") # Close output file if self.output_handle: self.output_handle.close() self.output_handle = None self.logger.info(f"Packet capture service stopped. Total packets captured: {self.packet_count}") def cleanup_event_subscriptions(self) -> None: """Clean up event subscriptions. Clears local subscription tracking list. """ # Note: meshcore library handles subscription cleanup automatically # This is mainly for tracking/logging self.event_subscriptions = [] async def setup_event_handlers(self) -> None: """Setup event handlers for packet capture. Subscribes to RX_LOG_DATA and RAW_DATA events. """ if not self.meshcore: return # Handle RX log data async def on_rx_log_data(event, metadata=None): await self.handle_rx_log_data(event, metadata) # Handle raw data async def on_raw_data(event, metadata=None): await self.handle_raw_data(event, metadata) # Subscribe to events (meshcore supports multiple subscribers) self.meshcore.subscribe(EventType.RX_LOG_DATA, on_rx_log_data) self.meshcore.subscribe(EventType.RAW_DATA, on_raw_data) self.event_subscriptions = [ (EventType.RX_LOG_DATA, on_rx_log_data), (EventType.RAW_DATA, on_raw_data) ] self.logger.info("Packet capture event handlers registered") async def handle_rx_log_data(self, event: Any, metadata: Optional[dict[str, Any]] = None) -> None: """Handle RX log data events (matches original script). Args: event: The RX log data event. metadata: Optional metadata dictionary. """ try: # Copy payload immediately to avoid segfault if event is freed payload = copy.deepcopy(event.payload) if hasattr(event, 'payload') else None if payload is None: self.logger.warning("RX log data event has no payload") return if 'snr' in payload: # Try to get packet data - prefer 'payload' field, fallback to 'raw_hex' # This matches the original script's logic exactly raw_hex = None # First, try the 'payload' field (already stripped of framing bytes) if 'payload' in payload and payload['payload']: raw_hex = payload['payload'] # Fallback to raw_hex with first 2 bytes stripped elif 'raw_hex' in payload and payload['raw_hex']: raw_hex = payload['raw_hex'][4:] # Skip first 2 bytes (4 hex chars) if raw_hex: if self.debug: self.logger.debug(f"Received RX_LOG_DATA: {raw_hex[:50]}...") # Process packet await self.process_packet(raw_hex, payload, metadata) else: self.logger.warning(f"RF log data missing both 'payload' and 'raw_hex' fields: {list(payload.keys())}") except Exception as e: self.logger.error(f"Error handling RX log data: {e}") async def handle_raw_data(self, event: Any, metadata: Optional[dict[str, Any]] = None) -> None: """Handle raw data events. Args: event: The raw data event. metadata: Optional metadata dictionary. """ try: # Copy payload immediately to avoid segfault if event is freed payload = copy.deepcopy(event.payload) if hasattr(event, 'payload') else None if payload is None: self.logger.warning("Raw data event has no payload") return raw_data = payload.get('data', '') if not raw_data: return # Convert to hex string if needed if isinstance(raw_data, bytes): raw_hex = raw_data.hex() elif isinstance(raw_data, str): raw_hex = raw_data if raw_hex.startswith('0x'): raw_hex = raw_hex[2:] else: return # Process packet await self.process_packet(raw_hex, payload, metadata) except Exception as e: self.logger.error(f"Error handling raw data: {e}") def _format_packet_data(self, raw_hex: str, packet_info: dict[str, Any], payload: dict[str, Any], metadata: Optional[dict[str, Any]] = None) -> dict[str, Any]: """Format packet data to match original script's format_packet_data exactly. Args: raw_hex: Raw hex string of the packet. packet_info: Decoded packet information. payload: Payload dictionary from the event. metadata: Optional metadata dictionary. Returns: Dict[str, Any]: Formatted packet dictionary. """ current_time = datetime.now() timestamp = current_time.isoformat() # Remove 0x prefix if present clean_raw_hex = raw_hex.replace('0x', '').upper() packet_len = len(clean_raw_hex) // 2 # Convert hex string to byte count # Map route type to single letter (matches original script) route_map = { "TRANSPORT_FLOOD": "F", "FLOOD": "F", "DIRECT": "D", "TRANSPORT_DIRECT": "T", "UNKNOWN": "U" } route = route_map.get(packet_info.get('route_type', 'UNKNOWN'), "U") # Map payload type to string number (matches original script) payload_type_map = { "REQ": "0", "RESPONSE": "1", "TXT_MSG": "2", "ACK": "3", "ADVERT": "4", "GRP_TXT": "5", "GRP_DATA": "6", "ANON_REQ": "7", "PATH": "8", "TRACE": "9", "MULTIPART": "10", "Type11": "11", "Type12": "12", "Type13": "13", "Type14": "14", "RAW_CUSTOM": "15", "UNKNOWN": "0" } packet_type = payload_type_map.get(packet_info.get('payload_type', 'UNKNOWN'), "0") # Calculate payload length (matches original script logic) firmware_payload_len = payload.get('payload_length') if firmware_payload_len is not None: payload_len = str(firmware_payload_len) else: # Calculate from packet structure path_len = packet_info.get('path_len', 0) has_transport = packet_info.get('has_transport_codes', False) transport_bytes = 4 if has_transport else 0 payload_len = str(max(0, packet_len - 1 - transport_bytes - 1 - path_len)) # Get device name and public key device_name = self._get_bot_name() if not device_name: device_name = "MeshCore Device" # Get device public key for origin_id origin_id = None if self.meshcore and hasattr(self.meshcore, 'self_info'): try: self_info = self.meshcore.self_info if isinstance(self_info, dict): origin_id = self_info.get('public_key', '') elif hasattr(self_info, 'public_key'): origin_id = self_info.public_key # Convert to hex string if bytes if isinstance(origin_id, bytes): origin_id = origin_id.hex() elif isinstance(origin_id, bytearray): origin_id = bytes(origin_id).hex() except Exception: pass # Normalize origin_id to uppercase origin_id = origin_id.replace('0x', '').replace(' ', '').upper() if origin_id else 'UNKNOWN' # Extract RF data snr = str(payload.get('snr', 'Unknown')) rssi = str(payload.get('rssi', 'Unknown')) # Get packet hash - check multiple sources in order of preference, then calculate if needed # This matches the original script's approach: use hash from routing_info if available, otherwise calculate packet_hash = '0000000000000000' # 1. Check if payload has packet_hash directly (from bot's processing) if isinstance(payload, dict): if 'packet_hash' in payload: packet_hash = payload['packet_hash'] # 2. Check if payload has routing_info with packet_hash elif 'routing_info' in payload: routing_info = payload.get('routing_info', {}) if isinstance(routing_info, dict) and 'packet_hash' in routing_info: packet_hash = routing_info['packet_hash'] # 3. Check metadata if available if packet_hash == '0000000000000000' and metadata and isinstance(metadata, dict): if 'packet_hash' in metadata: packet_hash = metadata['packet_hash'] elif 'routing_info' in metadata: routing_info = metadata.get('routing_info', {}) if isinstance(routing_info, dict) and 'packet_hash' in routing_info: packet_hash = routing_info['packet_hash'] # 4. Try to get from bot's recent_rf_data cache (if message_handler has processed it) # Note: The bot stores full raw_hex (with framing bytes) for packet_prefix, but we use stripped raw_hex # So we need to use the full raw_hex from payload for prefix matching # Optimized: Use indexed rf_data_by_pubkey for O(1) lookup instead of linear search if packet_hash == '0000000000000000' and hasattr(self.bot, 'message_handler'): try: message_handler = self.bot.message_handler # Get full raw_hex from payload for prefix matching (bot uses full raw_hex for packet_prefix) full_raw_hex = payload.get('raw_hex', '') if full_raw_hex: packet_prefix = full_raw_hex.replace('0x', '')[:32] if len(full_raw_hex.replace('0x', '')) >= 32 else full_raw_hex.replace('0x', '') else: # Fallback: use stripped raw_hex (might not match, but worth trying) clean_raw_hex_for_lookup = raw_hex.replace('0x', '') packet_prefix = clean_raw_hex_for_lookup[:32] if len(clean_raw_hex_for_lookup) >= 32 else clean_raw_hex_for_lookup # Use indexed lookup (O(1)) instead of linear search (O(n)) if hasattr(message_handler, 'rf_data_by_pubkey') and packet_prefix: rf_data_list = message_handler.rf_data_by_pubkey.get(packet_prefix, []) # Check most recent entries first (last in list, since they're appended in order) for rf_data in reversed(rf_data_list): if 'packet_hash' in rf_data: packet_hash = rf_data['packet_hash'] break elif 'routing_info' in rf_data: routing_info = rf_data.get('routing_info', {}) if isinstance(routing_info, dict) and 'packet_hash' in routing_info: packet_hash = routing_info['packet_hash'] break # Fallback to linear search only if indexed lookup not available (backward compatibility) if packet_hash == '0000000000000000' and hasattr(message_handler, 'recent_rf_data'): for rf_data in message_handler.recent_rf_data: # Match by packet_prefix (bot uses full raw_hex for this) if rf_data.get('packet_prefix') == packet_prefix: if 'packet_hash' in rf_data: packet_hash = rf_data['packet_hash'] break elif 'routing_info' in rf_data: routing_info = rf_data.get('routing_info', {}) if isinstance(routing_info, dict) and 'packet_hash' in routing_info: packet_hash = routing_info['packet_hash'] break except Exception as e: if self.debug: self.logger.debug(f"Error checking recent_rf_data for hash: {e}") # 5. Fall back to hash from decoded packet_info (should be calculated correctly) if packet_hash == '0000000000000000': packet_hash = packet_info.get('packet_hash', '0000000000000000') # 6. If still no hash, calculate it from raw_hex (matches original script's format_packet_data) if packet_hash == '0000000000000000': try: # Use payload_type_value from packet_info if available, otherwise None (will be extracted from header) payload_type_value = packet_info.get('payload_type_value') if payload_type_value is not None: # Ensure it's an integer (handle enum.value if passed) if hasattr(payload_type_value, 'value'): payload_type_value = payload_type_value.value payload_type_value = int(payload_type_value) & 0x0F packet_hash = calculate_packet_hash(clean_raw_hex, payload_type_value) except Exception as e: if self.debug: self.logger.debug(f"Error calculating packet hash: {e}") packet_hash = '0000000000000000' # Build packet data structure (matches original script exactly) packet_data = { "origin": device_name, "origin_id": origin_id, "timestamp": timestamp, "type": "PACKET", "direction": "rx", "time": current_time.strftime("%H:%M:%S"), "date": current_time.strftime("%d/%m/%Y"), "len": str(packet_len), "packet_type": packet_type, "route": route, "payload_len": payload_len, "raw": clean_raw_hex, "SNR": snr, "RSSI": rssi, "hash": packet_hash } # Add optional fields from payload if present (score, duration, etc.) if 'score' in payload: packet_data['score'] = str(payload['score']) if 'duration' in payload: packet_data['duration'] = str(payload['duration']) # Add path for route=D (matches original script) if route == "D" and packet_info.get('path'): packet_data["path"] = ",".join(packet_info['path']) return packet_data async def process_packet(self, raw_hex: str, payload: dict[str, Any], metadata: Optional[dict[str, Any]] = None) -> None: """Process a captured packet. Decodes the packet, formats it, writes to file, and publishes to MQTT. Args: raw_hex: Raw hex string of the packet. payload: Payload dictionary from the event. metadata: Optional metadata dictionary. """ try: self.packet_count += 1 # Extract packet information (decode may fail, but we still publish) packet_info = self.decode_packet(raw_hex, payload) # If decode failed, create minimal packet_info with defaults (matches original script) if not packet_info: if self.debug: self.logger.debug(f"Packet {self.packet_count} decode failed, using defaults (raw_hex: {raw_hex[:50]}...)") # Try to calculate packet hash even if decode failed (extract payload_type from header if possible) packet_hash = '0000000000000000' payload_type_value = None try: # Try to extract payload type from header for hash calculation byte_data = bytes.fromhex(raw_hex.replace('0x', '')) if len(byte_data) >= 1: header = byte_data[0] payload_type_value = (header >> 2) & 0x0F packet_hash = calculate_packet_hash(raw_hex.replace('0x', ''), payload_type_value) except Exception: pass # Use default hash if calculation fails # Create minimal packet info with defaults (matches original script's format_packet_data) packet_info = { 'route_type': 'UNKNOWN', 'payload_type': 'UNKNOWN', 'payload_type_value': payload_type_value or 0, 'payload_version': 0, 'path_len': 0, 'path_hex': '', 'path': [], 'payload_hex': raw_hex.replace('0x', ''), 'payload_bytes': len(raw_hex.replace('0x', '')) // 2, 'raw_hex': raw_hex.replace('0x', ''), 'packet_hash': packet_hash, 'has_transport_codes': False, 'transport_codes': None } # Format packet data to match original script's format formatted_packet = self._format_packet_data(raw_hex, packet_info, payload, metadata) # Write to file if self.output_handle: self.output_handle.write(json.dumps(formatted_packet, default=str) + '\n') self.output_handle.flush() # Publish to MQTT if enabled # The publish function will check per-broker connection status publish_metrics = {"attempted": 0, "succeeded": 0} if self.mqtt_enabled: if self.debug: self.logger.debug(f"Calling publish_packet_mqtt for packet {self.packet_count}") publish_metrics = await self.publish_packet_mqtt(formatted_packet) # Log DEBUG level for each packet (verbose; use INFO only for service lifecycle) action = "Skipping" if publish_metrics.get("skipped_by_filter") else "Captured" self.logger.debug(f"📦 {action} packet #{self.packet_count}: {formatted_packet['route']} type {formatted_packet['packet_type']}, {formatted_packet['len']} bytes, SNR: {formatted_packet['SNR']}, RSSI: {formatted_packet['RSSI']}, hash: {formatted_packet['hash']} (MQTT: {publish_metrics['succeeded']}/{publish_metrics['attempted']})") # Output full packet data structure in debug mode only (matches original script) if self.debug: self.logger.debug("📋 Full packet data structure:") self.logger.debug(json.dumps(formatted_packet, indent=2)) except Exception as e: self.logger.error(f"Error processing packet: {e}") def decode_packet(self, raw_hex: str, payload: dict[str, Any]) -> Optional[dict[str, Any]]: """Decode a MeshCore packet - matches original packet_capture.py functionality. Args: raw_hex: Raw hex string of the packet. payload: Payload dictionary from the event (unused in this method but kept for compatibility). Returns: Optional[Dict[str, Any]]: Decoded packet info, or None if decoding fails. """ try: # Remove 0x prefix if present if raw_hex.startswith('0x'): raw_hex = raw_hex[2:] byte_data = bytes.fromhex(raw_hex) if len(byte_data) < 2: if self.debug: self.logger.debug(f"Packet too short ({len(byte_data)} bytes), cannot decode") return None header = byte_data[0] # Extract route type route_type = RouteType(header & 0x03) has_transport = route_type in [RouteType.TRANSPORT_FLOOD, RouteType.TRANSPORT_DIRECT] # Extract transport codes if present transport_codes = None offset = 1 if has_transport and len(byte_data) >= 5: transport_bytes = byte_data[1:5] transport_codes = { 'code1': int.from_bytes(transport_bytes[0:2], byteorder='little'), 'code2': int.from_bytes(transport_bytes[2:4], byteorder='little'), 'hex': transport_bytes.hex() } offset = 5 if len(byte_data) <= offset: if self.debug: self.logger.debug(f"Packet too short after transport codes ({len(byte_data)} bytes, offset {offset}), cannot decode") return None path_len_byte = byte_data[offset] offset += 1 path_byte_length, bytes_per_hop = decode_path_len_byte(path_len_byte) if len(byte_data) < offset + path_byte_length: if self.debug: self.logger.debug(f"Packet too short for path ({len(byte_data)} bytes, need {offset + path_byte_length}), cannot decode") return None # Extract path path_bytes = byte_data[offset:offset + path_byte_length] offset += path_byte_length # Chunk path by bytes_per_hop from packet (1, 2, or 3; legacy fallback uses 1) hex_chars = bytes_per_hop * 2 path_hex = path_bytes.hex() path_nodes = [path_hex[i:i + hex_chars].upper() for i in range(0, len(path_hex), hex_chars)] if (len(path_hex) % hex_chars) != 0 or not path_nodes: path_nodes = [path_hex[i:i + 2].upper() for i in range(0, len(path_hex), 2)] # Remaining data is payload packet_payload = byte_data[offset:] # Extract payload version payload_version = PayloadVersion((header >> 6) & 0x03) if payload_version != PayloadVersion.VER_1: if self.debug: self.logger.debug(f"Unsupported payload version: {payload_version} (expected VER_1), skipping") return None # Extract payload type payload_type = PayloadType((header >> 2) & 0x0F) # Calculate packet hash (for tracking same message via different paths) packet_hash = calculate_packet_hash(raw_hex, payload_type.value) # Build packet info (matching original format) packet_info = { 'header': f"0x{header:02x}", 'route_type': route_type.name, 'route_type_value': route_type.value, 'payload_type': payload_type.name, 'payload_type_value': payload_type.value, 'payload_version': payload_version.value, 'path_len': len(path_nodes), 'path_byte_length': path_byte_length, 'bytes_per_hop': bytes_per_hop, 'path_hex': path_hex, 'path': path_nodes, # For TRACE, RF path is SNR×4 per hop — replaced below 'payload_hex': packet_payload.hex(), 'payload_bytes': len(packet_payload), 'raw_hex': raw_hex, 'packet_hash': packet_hash, 'has_transport_codes': has_transport, 'transport_codes': transport_codes } # TRACE: RF path bytes are SNR samples; commanded route is in payload[9:] if payload_type == PayloadType.TRACE: packet_info['trace_snr_path_hex'] = path_hex.upper() trace_route = parse_trace_payload_route_hashes(packet_payload) if trace_route: packet_info['path'] = trace_route packet_info['path_len'] = len(trace_route) return packet_info except Exception as e: self.logger.debug(f"Error decoding packet: {e} (raw_hex: {raw_hex[:50]}...)") import traceback if self.debug: self.logger.debug(f"Decode error traceback: {traceback.format_exc()}") return None def _get_bot_name(self) -> str: """Get bot name from device or config. Returns: str: The name of the bot/device. """ # Try to get name from device first if self.meshcore and hasattr(self.meshcore, 'self_info'): try: self_info = self.meshcore.self_info if isinstance(self_info, dict): device_name = self_info.get('name') or self_info.get('adv_name') if device_name: return device_name elif hasattr(self_info, 'name'): if self_info.name: return self_info.name elif hasattr(self_info, 'adv_name'): if self_info.adv_name: return self_info.adv_name except Exception as e: self.logger.debug(f"Could not get name from device: {e}") # Fallback to config bot_name = self.bot.config.get('Bot', 'bot_name', fallback='MeshCoreBot') return bot_name def _require_mqtt(self) -> bool: """Check if MQTT is available and required. Returns: bool: True if MQTT requirements are met, False otherwise. """ if mqtt is None: self.logger.warning( "MQTT support not available. Install paho-mqtt: " "pip install paho-mqtt" ) return False return True async def connect_mqtt_brokers(self) -> None: """Connect to MQTT brokers. Establish connections to all configured MQTT brokers. """ if not self._require_mqtt(): return # Get bot name for client ID bot_name = self._get_bot_name() for broker_config in self.mqtt_brokers: if not broker_config.get('enabled', True): continue try: # Use configured client_id, or generate from bot name client_id = broker_config.get('client_id') if not client_id: # Sanitize bot name for MQTT client ID (alphanumeric and hyphens only) safe_name = ''.join(c if c.isalnum() or c == '-' else '-' for c in bot_name) client_id = f"{safe_name}-packet-capture-{os.getpid()}" # Create client based on transport type transport = broker_config.get('transport', 'tcp').lower() if transport == 'websockets': try: client = mqtt.Client( client_id=client_id, transport='websockets' ) # Set WebSocket path (must be done before connect) ws_path = broker_config.get('websocket_path', '/mqtt') client.ws_set_options(path=ws_path, headers=None) except Exception as e: self.logger.error(f"WebSockets transport not available: {e}") continue else: client = mqtt.Client(client_id=client_id) # Enable paho-mqtt's built-in automatic reconnection (matches original script) client.reconnect_delay_set(min_delay=1, max_delay=120) # Set TLS if enabled if broker_config.get('use_tls', False): try: import ssl # For WebSockets with TLS (WSS), we need to set TLS on the client # The TLS handshake happens during the WebSocket upgrade client.tls_set(cert_reqs=ssl.CERT_NONE) # Allow self-signed certs if self.debug: self.logger.debug(f"TLS enabled for {broker_config['host']} ({transport})") except Exception as e: self.logger.warning(f"TLS setup failed for {broker_config['host']}: {e}") # Set username/password if provided username = broker_config.get('username') password = broker_config.get('password') if broker_config.get('use_auth_token'): # Use auth token with audience if specified token_audience = broker_config.get('token_audience') or broker_config['host'] # Get device's public key (from self_info) - this is what we use for username and JWT publicKey # The owner_public_key is ONLY for the 'owner' field in the JWT payload device_public_key_hex = None if self.meshcore and hasattr(self.meshcore, 'self_info'): try: self_info = self.meshcore.self_info if isinstance(self_info, dict): device_public_key_hex = self_info.get('public_key', '') elif hasattr(self_info, 'public_key'): device_public_key_hex = self_info.public_key # Convert to hex string if bytes if isinstance(device_public_key_hex, bytes): device_public_key_hex = device_public_key_hex.hex() elif isinstance(device_public_key_hex, bytearray): device_public_key_hex = bytes(device_public_key_hex).hex() except Exception as e: self.logger.debug(f"Could not get public key from device: {e}") if not device_public_key_hex: self.logger.warning(f"No device public key available for auth token (broker: {broker_config['host']})") continue # Create auth token (tries on-device signing first if available) use_device = (self.auth_token_method == 'device' and self.meshcore and self.meshcore.is_connected) # For Python signing, we still need meshcore_instance to fetch the private key # The use_device flag only controls whether we try on-device signing first meshcore_for_key_fetch = self.meshcore if self.meshcore and self.meshcore.is_connected else None try: # Use v1_{device_public_key} format for username (device's actual key, not owner key) if not username: username = f"v1_{device_public_key_hex.upper()}" token = await create_auth_token_async( meshcore_instance=meshcore_for_key_fetch, public_key_hex=device_public_key_hex, # Device's actual public key (for JWT publicKey field) private_key_hex=self.private_key_hex, iata=self.global_iata, audience=token_audience, owner_public_key=self.owner_public_key, # Owner's key (only for 'owner' field in JWT) owner_email=self.owner_email, use_device=use_device ) if token: password = token self.logger.debug( f"Created auth token for {broker_config['host']} " f"(username: {username}, valid for 24 hours) " f"using {'device' if use_device else 'Python'} signing" ) else: self.logger.warning(f"Failed to create auth token for {broker_config['host']}") except Exception as e: self.logger.error(f"Error creating auth token for {broker_config['host']}: {e}") if username: client.username_pw_set(username, password) # Setup callbacks def on_connect(client, userdata, flags, rc, properties=None): if rc == 0: self.logger.info(f"✓ Connected to MQTT broker: {broker_config['host']}:{broker_config['port']} ({transport})") # Track connection per broker for mqtt_info in self.mqtt_clients: if mqtt_info['client'] == client: mqtt_info['connected'] = True break # Set global connected flag if any broker is connected self.mqtt_connected = any(m.get('connected', False) for m in self.mqtt_clients) else: # MQTT error codes: 0=success, 1=protocol, 2=client, 3=network, 4=transport, 5=auth error_messages = { 1: "protocol version rejected", 2: "client identifier rejected", 3: "server unavailable", 4: "bad username or password", 5: "not authorized" } error_msg = error_messages.get(rc, f"unknown error ({rc})") self.logger.error( f"✗ Failed to connect to MQTT broker {broker_config['host']}: {rc} ({error_msg})" ) # Mark this broker as disconnected for mqtt_info in self.mqtt_clients: if mqtt_info['client'] == client: mqtt_info['connected'] = False break # Update global flag self.mqtt_connected = any(m.get('connected', False) for m in self.mqtt_clients) def on_disconnect(client, userdata, rc, properties=None): # Mark this broker as disconnected for mqtt_info in self.mqtt_clients: if mqtt_info['client'] == client: mqtt_info['connected'] = False if rc != 0: self.logger.warning(f"Disconnected from MQTT broker {broker_config['host']} (rc={rc})") else: self.logger.debug(f"Disconnected from MQTT broker {broker_config['host']}") break # Update global flag self.mqtt_connected = any(m.get('connected', False) for m in self.mqtt_clients) client.on_connect = on_connect client.on_disconnect = on_disconnect # Connect try: host = broker_config['host'] port = broker_config['port'] # Validate hostname (basic check) if not host or not host.strip(): self.logger.error(f"Invalid MQTT broker hostname: '{host}'") continue # Try to resolve hostname first (for better error messages) try: import socket socket.gethostbyname(host) except socket.gaierror as dns_error: # Only log DNS errors at debug level, not as errors if self.debug: self.logger.debug(f"DNS resolution check for '{host}': {dns_error}") # Continue anyway - connection attempt will show actual error except Exception as resolve_error: if self.debug: self.logger.debug(f"Hostname resolution check for '{host}': {resolve_error}") # Add client to list BEFORE starting the loop, so callbacks can find it self.mqtt_clients.append({ 'client': client, 'config': broker_config, 'connected': False # Track connection status per broker }) if transport == 'websockets': # WebSocket path already set via ws_set_options above ws_path = broker_config.get('websocket_path', '/mqtt') self.logger.debug(f"Connecting to MQTT broker {host}:{port} via WebSockets (path: {ws_path}, TLS: {broker_config.get('use_tls', False)})") # For WebSockets, connect without path parameter (path set via ws_set_options) # Run connect in executor to avoid blocking the event loop loop = asyncio.get_event_loop() try: await loop.run_in_executor(None, client.connect, host, port, 60) except Exception as connect_error: # Connection failed, but don't block - let loop_start handle retries self.logger.debug(f"Initial connect() call failed (non-blocking): {connect_error}") else: self.logger.debug(f"Connecting to MQTT broker {host}:{port} via TCP (TLS: {broker_config.get('use_tls', False)})") # Run connect in executor to avoid blocking the event loop loop = asyncio.get_event_loop() try: await loop.run_in_executor(None, client.connect, host, port, 60) except Exception as connect_error: # Connection failed, but don't block - let loop_start handle retries self.logger.debug(f"Initial connect() call failed (non-blocking): {connect_error}") # Start network loop (non-blocking) client.loop_start() self.logger.info(f"MQTT connection initiated to {host}:{port} ({transport})") # Give connection a moment to establish (especially for WebSockets) await asyncio.sleep(1) except Exception as e: error_msg = str(e) if "nodename nor servname provided" in error_msg or "Name or service not known" in error_msg: # Log DNS errors at debug level, actual connection errors at warning if self.debug: self.logger.debug(f"DNS/Connection error for '{broker_config['host']}': {error_msg}") else: self.logger.warning(f"Could not connect to MQTT broker '{broker_config['host']}' (check network/DNS)") elif "Connection refused" in error_msg: if self.debug: self.logger.debug(f"Connection refused by '{broker_config['host']}:{broker_config['port']}': {error_msg}") else: self.logger.warning(f"Connection refused by MQTT broker '{broker_config['host']}:{broker_config['port']}'") else: if self.debug: self.logger.debug(f"MQTT connection error for '{broker_config['host']}': {error_msg}") else: self.logger.warning(f"Error connecting to MQTT broker '{broker_config['host']}'") except Exception as e: self.logger.error(f"Error setting up MQTT broker: {e}") # Wait a bit for connections to establish await asyncio.sleep(2) # Log summary and publish initial status (matches original script) connected_count = sum(1 for m in self.mqtt_clients if m.get('connected', False)) if connected_count > 0: self.logger.info(f"Connected to {connected_count} MQTT broker(s)") # Publish initial status with firmware version now that MQTT is connected (matches original script) await asyncio.sleep(1) # Give MQTT connections a moment to stabilize await self.publish_status("online") else: self.logger.warning("MQTT enabled but no brokers connected") def _resolve_topic_template(self, template: str, packet_type: str = 'packet') -> Optional[str]: """Resolve topic template with placeholders. Args: template: Topic template string. packet_type: Type of packet ('packet' or 'status'). Returns: Optional[str]: Resolved topic string, or None if template is empty. """ if not template: return None # Get device's public key (NOT owner's key - owner key is only for JWT 'owner' field) # This matches the original script which uses self.device_public_key from self_info device_public_key = None if self.meshcore and hasattr(self.meshcore, 'self_info'): try: self_info = self.meshcore.self_info if isinstance(self_info, dict): device_public_key = self_info.get('public_key', '') elif hasattr(self_info, 'public_key'): device_public_key = self_info.public_key # Convert to hex string if bytes if isinstance(device_public_key, bytes): device_public_key = device_public_key.hex() elif isinstance(device_public_key, bytearray): device_public_key = bytes(device_public_key).hex() except Exception as e: self.logger.debug(f"Could not get public key from device: {e}") # Normalize to uppercase (remove 0x prefix if present) if device_public_key: device_public_key = device_public_key.replace('0x', '').replace(' ', '').upper() # Replace placeholders (matches original script's resolve_topic_template) topic = template.replace('{IATA}', self.global_iata.upper()) topic = topic.replace('{iata}', self.global_iata.lower()) topic = topic.replace('{PUBLIC_KEY}', device_public_key if device_public_key and device_public_key != 'Unknown' else 'DEVICE') topic = topic.replace('{public_key}', (device_public_key if device_public_key and device_public_key != 'Unknown' else 'DEVICE').lower()) return topic async def publish_packet_mqtt(self, packet_info: dict[str, Any]) -> dict[str, Any]: """Publish packet to MQTT - returns metrics dict with attempted/succeeded/skipped_by_filter. Args: packet_info: Formatted packet dictionary. Returns: Dict with 'attempted', 'succeeded' counts and 'skipped_by_filter' (True when packet type was excluded by mqttN_upload_packet_types for all connected brokers). """ # Always log when function is called (helps diagnose if it's not being invoked) self.logger.debug(f"publish_packet_mqtt called (packet {self.packet_count}, {len(self.mqtt_clients)} clients)") # Initialize metrics (skipped_by_filter: True when packet type excluded by upload_packet_types) metrics = {"attempted": 0, "succeeded": 0, "skipped_by_filter": False} # Check per-broker connection status (more accurate than global flag) # Don't use early return - let the loop check each broker individually if not self.mqtt_clients: self.logger.debug("No MQTT clients configured, skipping publish") return metrics connected_count = sum(1 for m in self.mqtt_clients if m.get('connected', False)) self.logger.debug(f"Publishing packet to MQTT ({connected_count}/{len(self.mqtt_clients)} brokers connected)") for mqtt_client_info in self.mqtt_clients: # Only publish to connected brokers if not mqtt_client_info.get('connected', False): self.logger.debug(f"Skipping MQTT broker {mqtt_client_info['config'].get('host', 'unknown')} (not connected)") continue try: client = mqtt_client_info['client'] config = mqtt_client_info['config'] # Per-broker packet type filter: if set, only upload listed types (e.g. 2,4 = TXT_MSG, ADVERT) upload_types = config.get('upload_packet_types') if upload_types is not None and packet_info.get('packet_type', '') not in upload_types: metrics["skipped_by_filter"] = True self.logger.debug( f"Skipping MQTT broker {config.get('host', 'unknown')} (packet type {packet_info.get('packet_type')} not in {sorted(upload_types)})" ) continue # Determine topic topic = None if config.get('topic_packets'): topic = self._resolve_topic_template(config['topic_packets'], 'packet') elif config.get('topic_prefix'): topic = f"{config['topic_prefix']}/packet" else: topic = 'meshcore/packets/packet' if not topic: continue payload = json.dumps(packet_info, default=str) # Log topic and payload size for debugging self.logger.debug(f"Publishing to topic '{topic}' on {config['host']} (payload: {len(payload)} bytes)") # Count as attempted metrics["attempted"] += 1 # Use QoS 0 (matches original script - prevents retry storms) result = client.publish(topic, payload, qos=0) if result.rc == mqtt.MQTT_ERR_SUCCESS: metrics["succeeded"] += 1 self.logger.debug(f"Published packet to MQTT topic '{topic}' on {config['host']} (qos=0)") else: self.logger.warning(f"Failed to publish packet to MQTT topic '{topic}' on {config['host']}: {result.rc} ({mqtt.error_string(result.rc)})") except Exception as e: self.logger.error(f"Error publishing packet to MQTT on {config.get('host', 'unknown')}: {e}") # Log summary if metrics["succeeded"] > 0: self.logger.debug(f"Published packet to {metrics['succeeded']} MQTT broker(s)") elif connected_count == 0: self.logger.debug("No MQTT brokers connected, packet not published") return metrics async def start_background_tasks(self) -> None: """Start background tasks. Initializes scheduler for stats refresh, JWT renewal, health checks, and MQTT reconnection monitor. """ # Stats refresh scheduler (matches original script) if self.stats_status_enabled and self.stats_refresh_interval > 0: self.stats_update_task = asyncio.create_task(self.stats_refresh_scheduler()) self.background_tasks.append(self.stats_update_task) # JWT renewal scheduler if self.jwt_renewal_interval > 0: task = asyncio.create_task(self.jwt_renewal_scheduler()) self.background_tasks.append(task) # Health check if self.health_check_interval > 0: task = asyncio.create_task(self.health_check_loop()) self.background_tasks.append(task) # MQTT reconnection monitor (proactive reconnection for failed/disconnected brokers) if self.mqtt_enabled: task = asyncio.create_task(self.mqtt_reconnection_monitor()) self.background_tasks.append(task) async def stats_refresh_scheduler(self) -> None: """Periodically refresh stats and publish them via MQTT (matches original script). Fetches updated radio stats and triggers status publication. """ if self.stats_refresh_interval <= 0 or not self.stats_status_enabled: return while not self.should_exit: try: # Only fetch stats when we're about to publish status if self.mqtt_enabled: connected_count = sum(1 for m in self.mqtt_clients if m.get('connected', False)) if connected_count > 0: await self.publish_status("online", refresh_stats=True) except asyncio.CancelledError: break except Exception as exc: self.logger.debug(f"Stats refresh error: {exc}") if await self._wait_with_shutdown(self.stats_refresh_interval): break async def _wait_with_shutdown(self, timeout: float) -> bool: """Wait for specified time but return immediately if shutdown is requested. Args: timeout: Time to wait in seconds. Returns: bool: True if shutdown requested, False if timeout completed. """ if self.should_exit: return True await asyncio.sleep(timeout) return False def _load_client_version(self) -> str: """Load client version (matches original script). Returns: str: Version string (e.g., 'meshcore-bot/1.0.0-abcdef'). """ try: import os import subprocess script_dir = os.path.dirname(os.path.abspath(__file__)) version_file = os.path.join(script_dir, '..', '..', '.version_info') # First try to load from .version_info file (created by installer) if os.path.exists(version_file): with open(version_file) as f: version_data = json.load(f) installer_ver = version_data.get('installer_version', 'unknown') git_hash = version_data.get('git_hash', 'unknown') return f"meshcore-bot/{installer_ver}-{git_hash}" # Fallback: try to get git information directly try: result = subprocess.run(['git', 'rev-parse', '--short', 'HEAD'], cwd=os.path.dirname(script_dir), capture_output=True, text=True, timeout=5) if result.returncode == 0: git_hash = result.stdout.strip() return f"meshcore-bot/dev-{git_hash}" except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError): pass except Exception as e: self.logger.debug(f"Could not load version info: {e}") # Final fallback return "meshcore-bot/unknown" async def get_firmware_info(self) -> dict[str, str]: """Get firmware information from meshcore device (matches original script). Returns: Dict[str, str]: Dictionary containing 'model' and 'version'. """ try: # During shutdown, always use cached info - don't query the device if self.should_exit: if self.cached_firmware_info: self.logger.debug("Using cached firmware info (shutdown in progress)") return self.cached_firmware_info else: return {"model": "unknown", "version": "unknown"} # Always use cached info if available - firmware info doesn't change during runtime if self.cached_firmware_info: if self.debug: self.logger.debug("Using cached firmware info") return self.cached_firmware_info # Only query if we don't have cached info if not self.meshcore or not self.meshcore.is_connected: return {"model": "unknown", "version": "unknown"} self.logger.debug("Querying device for firmware info...") # Use send_device_query() to get firmware version result = await self.meshcore.commands.send_device_query() if result is None: self.logger.debug("Device query failed") return {"model": "unknown", "version": "unknown"} if result.type == EventType.ERROR: self.logger.debug(f"Device query failed: {result}") return {"model": "unknown", "version": "unknown"} if result.payload: payload = result.payload self.logger.debug(f"Device query payload: {payload}") # Check firmware version format fw_ver = payload.get('fw ver', 0) self.logger.debug(f"Firmware version number: {fw_ver}") if fw_ver >= 3: # For newer firmware versions (v3+) model = payload.get('model', 'Unknown') version = payload.get('ver', 'Unknown') build_date = payload.get('fw_build', 'Unknown') # Remove 'v' prefix from version if it already has one if version.startswith('v'): version = version[1:] version_str = f"v{version} (Build: {build_date})" self.logger.debug(f"New firmware format - Model: {model}, Version: {version_str}") firmware_info = {"model": model, "version": version_str} self.cached_firmware_info = firmware_info # Cache the result return firmware_info else: # For older firmware versions version_str = f"v{fw_ver}" self.logger.debug(f"Old firmware format - Model: unknown, Version: {version_str}") firmware_info = {"model": "unknown", "version": version_str} self.cached_firmware_info = firmware_info # Cache the result return firmware_info self.logger.debug("No payload in device query result") return {"model": "unknown", "version": "unknown"} except Exception as e: self.logger.debug(f"Error getting firmware info: {e}") return {"model": "unknown", "version": "unknown"} def stats_commands_available(self) -> bool: """Detect whether the connected meshcore build exposes stats commands (matches original script). Returns: bool: True if stats commands are available. """ if not self.meshcore or not hasattr(self.meshcore, "commands"): return False commands = self.meshcore.commands required = ["get_stats_core", "get_stats_radio"] available = all(callable(getattr(commands, attr, None)) for attr in required) state = "available" if available else "missing" if state != self.stats_capability_state: if available: self.logger.info("MeshCore stats commands detected - status messages will include device stats") else: self.logger.info("MeshCore stats commands not available - skipping stats in status messages") self.stats_capability_state = state self.stats_supported = available return available async def refresh_stats(self, force: bool = False) -> Optional[dict[str, Any]]: """Fetch stats from the radio and cache them for status publishing (matches original script). Args: force: Force refresh even if cache is fresh. Returns: Optional[Dict[str, Any]]: Dictionary of stats or None if unavailable. """ if not self.stats_status_enabled: if self.debug: self.logger.debug("Stats refresh skipped: stats_status_enabled is False") return None if not self.meshcore or not self.meshcore.is_connected: return None if self.stats_refresh_interval <= 0: if self.debug: self.logger.debug("Stats refresh skipped: stats_refresh_interval is 0 or negative") return None if not self.stats_commands_available(): if self.debug: self.logger.debug("Stats refresh skipped: stats commands not available") return None now = time.time() if ( not force and self.latest_stats and (now - self.last_stats_fetch) < max(60, self.stats_refresh_interval // 2) ): return dict(self.latest_stats) async with self.stats_fetch_lock: # Another coroutine may have completed the refresh while we waited if ( not force and self.latest_stats and (time.time() - self.last_stats_fetch) < max(60, self.stats_refresh_interval // 2) ): return dict(self.latest_stats) stats_payload = {} try: core_result = await self.meshcore.commands.get_stats_core() if core_result and core_result.type == EventType.STATS_CORE and core_result.payload: stats_payload.update(core_result.payload) elif core_result and core_result.type == EventType.ERROR: self.logger.debug(f"Core stats unavailable: {core_result.payload}") except Exception as exc: self.logger.debug(f"Error fetching core stats: {exc}") try: radio_result = await self.meshcore.commands.get_stats_radio() if radio_result and radio_result.type == EventType.STATS_RADIO and radio_result.payload: stats_payload.update(radio_result.payload) elif radio_result and radio_result.type == EventType.ERROR: self.logger.debug(f"Radio stats unavailable: {radio_result.payload}") except Exception as exc: self.logger.debug(f"Error fetching radio stats: {exc}") if stats_payload: self.latest_stats = stats_payload self.last_stats_fetch = time.time() if self.debug: self.logger.debug(f"Updated stats cache: {self.latest_stats}") elif self.debug: self.logger.debug("Stats refresh completed but returned no data") return dict(self.latest_stats) if self.latest_stats else None async def publish_status(self, status: str, refresh_stats: bool = True) -> None: """Publish status with additional information (matches original script exactly). Args: status: Status string (e.g., 'online', 'offline'). refresh_stats: Whether to refresh stats before publishing. """ firmware_info = await self.get_firmware_info() # Get device name and public key device_name = self._get_bot_name() if not device_name: device_name = "MeshCore Device" # Get device public key for origin_id device_public_key = None if self.meshcore and hasattr(self.meshcore, 'self_info'): try: self_info = self.meshcore.self_info if isinstance(self_info, dict): device_public_key = self_info.get('public_key', '') elif hasattr(self_info, 'public_key'): device_public_key = self_info.public_key # Convert to hex string if bytes if isinstance(device_public_key, bytes): device_public_key = device_public_key.hex() elif isinstance(device_public_key, bytearray): device_public_key = bytes(device_public_key).hex() except Exception: pass # Normalize origin_id to uppercase if device_public_key: device_public_key = device_public_key.replace('0x', '').replace(' ', '').upper() else: device_public_key = 'DEVICE' # Get radio info if available if not self.radio_info and self.meshcore and hasattr(self.meshcore, 'self_info'): try: self_info = self.meshcore.self_info radio_freq = self_info.get('radio_freq', 0) if isinstance(self_info, dict) else getattr(self_info, 'radio_freq', 0) radio_bw = self_info.get('radio_bw', 0) if isinstance(self_info, dict) else getattr(self_info, 'radio_bw', 0) radio_sf = self_info.get('radio_sf', 0) if isinstance(self_info, dict) else getattr(self_info, 'radio_sf', 0) radio_cr = self_info.get('radio_cr', 0) if isinstance(self_info, dict) else getattr(self_info, 'radio_cr', 0) self.radio_info = f"{radio_freq},{radio_bw},{radio_sf},{radio_cr}" except Exception: pass status_msg = { "status": status, "timestamp": datetime.now().isoformat(), "origin": device_name, "origin_id": device_public_key, "model": firmware_info.get('model', 'unknown'), "firmware_version": firmware_info.get('version', 'unknown'), "radio": self.radio_info or "unknown", "client_version": self._load_client_version() } # Attach stats (online status only) if supported and enabled if ( status.lower() == "online" and self.stats_status_enabled ): stats_payload = None if refresh_stats: # Always force refresh stats right before publishing to ensure fresh data stats_payload = await self.refresh_stats(force=True) if not stats_payload: self.logger.debug("Stats refresh returned no data - stats will not be included in status message") elif self.latest_stats: stats_payload = dict(self.latest_stats) if stats_payload: status_msg["stats"] = stats_payload elif self.debug: self.logger.debug("No stats payload available - status message will not include stats") # Publish status to all connected brokers for mqtt_client_info in self.mqtt_clients: # Only publish to connected brokers if not mqtt_client_info.get('connected', False): continue try: client = mqtt_client_info['client'] config = mqtt_client_info['config'] # Determine topic topic = None if config.get('topic_status'): topic = self._resolve_topic_template(config['topic_status'], 'status') elif config.get('topic_prefix'): topic = f"{config['topic_prefix']}/status" else: topic = 'meshcore/status' if not topic: continue payload = json.dumps(status_msg, default=str) # Use QoS 0 with retain=True for status (matches original script) result = client.publish(topic, payload, qos=0, retain=True) if result.rc == mqtt.MQTT_ERR_SUCCESS: if self.debug: self.logger.debug(f"Published status: {status}") else: self.logger.warning(f"Failed to publish status to MQTT topic '{topic}': {result.rc}") except Exception as e: self.logger.error(f"Error publishing status to MQTT: {e}") async def jwt_renewal_scheduler(self) -> None: """Background task to proactively renew JWT tokens before expiration. Renews auth tokens for all MQTT brokers that use token authentication. Runs every jwt_renewal_interval seconds (default: 12 hours). Tokens are valid for 24 hours, so this provides a 12-hour buffer. """ if self.jwt_renewal_interval <= 0: return while not self.should_exit: try: await asyncio.sleep(self.jwt_renewal_interval) if self.should_exit: break # Renew tokens for all MQTT brokers that use auth tokens for mqtt_client_info in self.mqtt_clients: config = mqtt_client_info['config'] client = mqtt_client_info['client'] # Only renew for brokers that use auth tokens if not config.get('use_auth_token'): continue try: broker_host = config.get('host', 'unknown') self.logger.debug(f"Renewing auth token for MQTT broker {broker_host}...") # Get device's public key device_public_key_hex = None if self.meshcore and hasattr(self.meshcore, 'self_info'): try: self_info = self.meshcore.self_info if isinstance(self_info, dict): device_public_key_hex = self_info.get('public_key', '') elif hasattr(self_info, 'public_key'): device_public_key_hex = self_info.public_key # Convert to hex string if bytes if isinstance(device_public_key_hex, bytes): device_public_key_hex = device_public_key_hex.hex() elif isinstance(device_public_key_hex, bytearray): device_public_key_hex = bytes(device_public_key_hex).hex() except Exception as e: self.logger.debug(f"Could not get public key from device: {e}") if not device_public_key_hex: self.logger.warning(f"No device public key available for token renewal (broker: {broker_host})") continue # Create new auth token token_audience = config.get('token_audience') or broker_host username = f"v1_{device_public_key_hex.upper()}" use_device = (self.auth_token_method == 'device' and self.meshcore and self.meshcore.is_connected) meshcore_for_key_fetch = self.meshcore if self.meshcore and self.meshcore.is_connected else None token = await create_auth_token_async( meshcore_instance=meshcore_for_key_fetch, public_key_hex=device_public_key_hex, private_key_hex=self.private_key_hex, iata=self.global_iata, audience=token_audience, owner_public_key=self.owner_public_key, owner_email=self.owner_email, use_device=use_device ) if token: # Update client credentials with new token client.username_pw_set(username, token) self.logger.info(f"✓ Renewed auth token for MQTT broker {broker_host} (valid for 24 hours)") else: self.logger.warning(f"Failed to renew auth token for MQTT broker {broker_host}") except Exception as e: self.logger.error(f"Error renewing token for MQTT broker {config.get('host', 'unknown')}: {e}") except asyncio.CancelledError: break except Exception as e: self.logger.error(f"Error in JWT renewal scheduler: {e}") await asyncio.sleep(60) async def health_check_loop(self) -> None: """Background task for health checks. Monitors connection status and warns on failures. """ if self.health_check_interval <= 0: return while not self.should_exit: try: await asyncio.sleep(self.health_check_interval) if not self.meshcore or not self.meshcore.is_connected: self.health_check_failure_count += 1 if self.health_check_failure_count >= self.health_check_grace_period: self.logger.warning("Health check failed - connection lost") else: self.health_check_failure_count = 0 except asyncio.CancelledError: break except Exception as e: self.logger.error(f"Error in health check loop: {e}") await asyncio.sleep(60) async def mqtt_reconnection_monitor(self) -> None: """Proactive MQTT reconnection monitor - checks and reconnects disconnected brokers. Periodically checks connectivity of all configured MQTT brokers and attempts reconnection if disconnected. """ if not self.mqtt_enabled: return # Reconnection check interval (check every 30 seconds) check_interval = 30 while not self.should_exit: try: await asyncio.sleep(check_interval) if not self.mqtt_clients: continue # Check each broker's connection status for mqtt_client_info in self.mqtt_clients: client = mqtt_client_info['client'] config = mqtt_client_info['config'] broker_host = config.get('host', 'unknown') # Check if client is connected if not client.is_connected(): # Client is disconnected - attempt reconnection try: self.logger.info(f"MQTT broker {broker_host} is disconnected, attempting reconnection...") # If using auth tokens, try to renew the token before reconnecting if config.get('use_auth_token'): # Get device's public key for username device_public_key_hex = None if self.meshcore and hasattr(self.meshcore, 'self_info'): try: self_info = self.meshcore.self_info if isinstance(self_info, dict): device_public_key_hex = self_info.get('public_key', '') elif hasattr(self_info, 'public_key'): device_public_key_hex = self_info.public_key # Convert to hex string if bytes if isinstance(device_public_key_hex, bytes): device_public_key_hex = device_public_key_hex.hex() elif isinstance(device_public_key_hex, bytearray): device_public_key_hex = bytes(device_public_key_hex).hex() except Exception: pass if device_public_key_hex: # Create new auth token token_audience = config.get('token_audience') or broker_host username = f"v1_{device_public_key_hex.upper()}" use_device = (self.auth_token_method == 'device' and self.meshcore and self.meshcore.is_connected) meshcore_for_key_fetch = self.meshcore if self.meshcore and self.meshcore.is_connected else None try: token = await create_auth_token_async( meshcore_instance=meshcore_for_key_fetch, public_key_hex=device_public_key_hex, private_key_hex=self.private_key_hex, iata=self.global_iata, audience=token_audience, owner_public_key=self.owner_public_key, owner_email=self.owner_email, use_device=use_device ) if token: # Update credentials client.username_pw_set(username, token) self.logger.debug(f"Renewed auth token for {broker_host} before reconnection") except Exception as e: self.logger.debug(f"Error renewing auth token for {broker_host}: {e}") # Attempt reconnection (non-blocking to avoid blocking event loop) config['host'] config['port'] loop = asyncio.get_event_loop() try: await loop.run_in_executor(None, client.reconnect) except Exception as reconnect_error: # Reconnection failed, but don't block - will retry on next cycle self.logger.debug(f"Reconnect() call failed (non-blocking): {reconnect_error}") # Give it a moment to connect await asyncio.sleep(2) # Check if reconnection succeeded if client.is_connected(): self.logger.info(f"✓ Successfully reconnected to MQTT broker {broker_host}") mqtt_client_info['connected'] = True # Update global flag self.mqtt_connected = any(m.get('connected', False) for m in self.mqtt_clients) else: if self.debug: self.logger.debug(f"Reconnection attempt to {broker_host} still in progress or failed") except Exception as e: self.logger.debug(f"Error attempting MQTT reconnection to {broker_host}: {e}") except asyncio.CancelledError: break except Exception as e: self.logger.error(f"Error in MQTT reconnection monitor: {e}") await asyncio.sleep(60)