From c08dba316bb366ff25af0853ce4def586195df37 Mon Sep 17 00:00:00 2001 From: agessaman Date: Sun, 1 Feb 2026 16:59:55 -0800 Subject: [PATCH] feat: Add contact and channel database adapters for pymc_core integration - Introduced ContactBookAdapter and ChannelDatabaseAdapter classes to facilitate interaction with the bot's database for contact and channel information. - Implemented caching mechanisms for efficient retrieval of contacts and channels. - Enhanced channel handling by including public and common hashtag channels, along with custom channels from the database. - Added error handling and logging for improved debugging and operational transparency. --- modules/meshtnc_serial.py | 458 +++++++++++++++++++++++ modules/pymc_connection.py | 749 +++++++++++++++++++++++++++++-------- 2 files changed, 1051 insertions(+), 156 deletions(-) create mode 100644 modules/meshtnc_serial.py diff --git a/modules/meshtnc_serial.py b/modules/meshtnc_serial.py new file mode 100644 index 0000000..1fd6133 --- /dev/null +++ b/modules/meshtnc_serial.py @@ -0,0 +1,458 @@ +""" +MeshTNC Serial Wrapper - CLI Mode with RXLOG parsing + +This wrapper uses MeshTNC's CLI mode instead of KISS mode as a workaround +for the KISS RX bug where received packets are not forwarded to serial. + +In CLI mode: +- RX packets come as RXLOG lines: "timestamp,RXLOG,rssi,snr,hex_data" +- TX packets are sent via KISS mode (temporarily switch, send, switch back) +""" + +import asyncio +import logging +import threading +import time +from collections import deque +from typing import Any, Callable, Dict, Optional + +import serial + +logger = logging.getLogger("MeshTNCSerial") + +# KISS Protocol Constants (for TX only) +KISS_FEND = 0xC0 +KISS_FESC = 0xDB +KISS_TFEND = 0xDC +KISS_TFESC = 0xDD + + +class MeshTNCSerial: + """ + MeshTNC Serial Interface using CLI mode for RX and KISS for TX. + + Implements the same interface as KissSerialWrapper for compatibility + with pymc_core's Dispatcher. + """ + + def __init__( + self, + port: str, + baudrate: int = 115200, + timeout: float = 1.0, + radio_config: Optional[Dict[str, Any]] = None, + on_frame_received: Optional[Callable[[bytes], None]] = None, + ): + self.port = port + self.baudrate = baudrate + self.timeout = timeout + self.radio_config = radio_config or {} + + self.serial_conn: Optional[serial.Serial] = None + self.is_connected = False + self.kiss_mode_active = False + + # RX handling + self.on_frame_received = on_frame_received + self.rx_thread: Optional[threading.Thread] = None + self.stop_event = threading.Event() + + # TX queue for KISS frames + self.tx_queue = deque(maxlen=100) + self.tx_thread: Optional[threading.Thread] = None + self.tx_lock = threading.Lock() + + # Stats + self.stats = { + "frames_sent": 0, + "frames_received": 0, + "bytes_sent": 0, + "bytes_received": 0, + "last_rssi": None, + "last_snr": None, + } + + # Line buffer for CLI mode parsing + self._line_buffer = "" + + # Event loop reference for thread-safe callbacks + self._loop: Optional[asyncio.AbstractEventLoop] = None + + def connect(self) -> bool: + """Connect to MeshTNC and configure radio.""" + try: + self.serial_conn = serial.Serial( + port=self.port, + baudrate=self.baudrate, + timeout=self.timeout, + bytesize=serial.EIGHTBITS, + parity=serial.PARITY_NONE, + stopbits=serial.STOPBITS_ONE, + ) + + self.is_connected = True + self.stop_event.clear() + + logger.info(f"MeshTNC connected to {self.port} at {self.baudrate} baud") + + # Exit any existing KISS mode + self._exit_kiss_mode() + time.sleep(0.3) + self._clear_buffer() + + # Configure radio + if self.radio_config: + if not self._configure_radio(): + logger.error("Failed to configure radio") + return False + + # Enable rxlog for receiving packets + self._send_command("rxlog on") + time.sleep(0.2) + self._clear_buffer() + + # Start RX thread (CLI mode parsing) + self.rx_thread = threading.Thread(target=self._rx_worker, daemon=True) + self.rx_thread.start() + + # Start TX thread + self.tx_thread = threading.Thread(target=self._tx_worker, daemon=True) + self.tx_thread.start() + + logger.info("MeshTNC configured in CLI mode with rxlog enabled") + return True + + except Exception as e: + logger.error(f"Failed to connect to MeshTNC: {e}") + self.is_connected = False + return False + + def disconnect(self): + """Disconnect from MeshTNC.""" + self.is_connected = False + self.stop_event.set() + + if self.rx_thread and self.rx_thread.is_alive(): + self.rx_thread.join(timeout=2.0) + if self.tx_thread and self.tx_thread.is_alive(): + self.tx_thread.join(timeout=2.0) + + if self.serial_conn and self.serial_conn.is_open: + try: + self._send_command("rxlog off") + except: + pass + self.serial_conn.close() + + logger.info("MeshTNC disconnected") + + def _configure_radio(self) -> bool: + """Configure radio settings via CLI.""" + try: + freq_hz = self.radio_config.get("frequency", 910525000) + bw_hz = self.radio_config.get("bandwidth", 62500) + sf = self.radio_config.get("spreading_factor", 7) + cr = self.radio_config.get("coding_rate", 5) + sync_word = self.radio_config.get("sync_word", 0x12) + + # Convert to CLI format + freq_mhz = freq_hz / 1_000_000 + bw_khz = bw_hz / 1000 + + if isinstance(sync_word, int): + sync_str = f"0x{sync_word:02X}" + else: + sync_str = str(sync_word) + + cmd = f"set radio {freq_mhz},{bw_khz},{sf},{cr},{sync_str}" + logger.info(f"Configuring radio: {cmd}") + + response = self._send_command(cmd) + logger.info(f"Radio config response: {response}") + + return True + + except Exception as e: + logger.error(f"Radio configuration error: {e}") + return False + + def _send_command(self, cmd: str) -> str: + """Send a CLI command and return response.""" + if not self.serial_conn or not self.serial_conn.is_open: + return "" + + try: + self.serial_conn.write(f"{cmd}\r\n".encode('ascii')) + self.serial_conn.flush() + time.sleep(0.3) + + response = "" + if self.serial_conn.in_waiting > 0: + response = self.serial_conn.read( + self.serial_conn.in_waiting + ).decode('utf-8', errors='ignore') + + return response.strip() + + except Exception as e: + logger.error(f"Command error: {e}") + return "" + + def _clear_buffer(self): + """Clear serial buffer.""" + if self.serial_conn and self.serial_conn.is_open: + try: + if self.serial_conn.in_waiting > 0: + self.serial_conn.read(self.serial_conn.in_waiting) + except: + pass + + def _exit_kiss_mode(self): + """Exit KISS mode if active.""" + if self.serial_conn and self.serial_conn.is_open: + try: + self.serial_conn.write(bytes([KISS_FEND, 0xFF, KISS_FEND])) + self.serial_conn.flush() + self.kiss_mode_active = False + except: + pass + + def _rx_worker(self): + """Background thread for receiving and parsing RXLOG lines.""" + logger.info("RX worker started (CLI/RXLOG mode)") + + while not self.stop_event.is_set() and self.is_connected: + try: + if self.serial_conn and self.serial_conn.in_waiting > 0: + data = self.serial_conn.read(self.serial_conn.in_waiting) + text = data.decode('utf-8', errors='ignore') + self._line_buffer += text + + # Process complete lines + while '\n' in self._line_buffer: + line, self._line_buffer = self._line_buffer.split('\n', 1) + line = line.strip() + if line: + self._process_line(line) + else: + time.sleep(0.01) + + except Exception as e: + if self.is_connected: + logger.error(f"RX worker error: {e}") + break + + logger.info("RX worker stopped") + + def _process_line(self, line: str): + """Process a CLI output line, looking for RXLOG entries.""" + # RXLOG format: timestamp,RXLOG,rssi,snr,hex_data + # Example: 946688557,RXLOG,-91.00,10.00,15125A077ECD3CE859BF... + + if ',RXLOG,' not in line: + return + + try: + parts = line.split(',') + if len(parts) >= 5 and parts[1] == 'RXLOG': + rssi = float(parts[2]) + snr = float(parts[3]) + hex_data = parts[4] + + # Convert hex to bytes + packet_data = bytes.fromhex(hex_data) + + # Update stats + self.stats["frames_received"] += 1 + self.stats["bytes_received"] += len(packet_data) + self.stats["last_rssi"] = rssi + self.stats["last_snr"] = snr + + logger.debug(f"RX packet: {len(packet_data)} bytes, RSSI={rssi}, SNR={snr}") + + # Call the callback (thread-safe) + if self.on_frame_received and len(packet_data) > 0: + self._invoke_callback(packet_data) + + except Exception as e: + logger.debug(f"Failed to parse RXLOG line: {line} - {e}") + + def _invoke_callback(self, packet_data: bytes): + """Invoke the RX callback in a thread-safe manner.""" + if not self.on_frame_received: + return + + try: + # Try to get the running loop (works if called from async context) + try: + loop = asyncio.get_running_loop() + # We're in an async context, create task directly + loop.create_task(self._async_callback_wrapper(packet_data)) + return + except RuntimeError: + pass + + # Use stored loop reference if available + if self._loop and self._loop.is_running(): + # Schedule callback on the event loop from this thread + self._loop.call_soon_threadsafe( + lambda: self._loop.create_task( + self._async_callback_wrapper(packet_data) + ) + ) + else: + # Fallback: direct call (may not work with async callbacks) + self.on_frame_received(packet_data) + + except Exception as e: + logger.error(f"Error invoking callback: {e}") + + async def _async_callback_wrapper(self, packet_data: bytes): + """Async wrapper to invoke the callback.""" + try: + self.on_frame_received(packet_data) + except Exception as e: + logger.error(f"Error in frame received callback: {e}") + + def _tx_worker(self): + """Background thread for transmitting packets.""" + logger.info("TX worker started") + + while not self.stop_event.is_set() and self.is_connected: + try: + if self.tx_queue: + with self.tx_lock: + if self.tx_queue: + packet = self.tx_queue.popleft() + self._transmit_packet(packet) + else: + time.sleep(0.01) + + except Exception as e: + if self.is_connected: + logger.error(f"TX worker error: {e}") + break + + logger.info("TX worker stopped") + + def _transmit_packet(self, data: bytes): + """Transmit a packet using KISS mode temporarily.""" + if not self.serial_conn or not self.serial_conn.is_open: + return + + try: + # Enter KISS mode + self._send_command("serial mode kiss") + time.sleep(0.5) + self.kiss_mode_active = True + + # Encode as KISS frame + kiss_frame = self._encode_kiss_frame(data) + + # Send + self.serial_conn.write(kiss_frame) + self.serial_conn.flush() + + self.stats["frames_sent"] += 1 + self.stats["bytes_sent"] += len(data) + + logger.debug(f"TX packet: {len(data)} bytes") + + # Wait for transmission to complete + time.sleep(0.3) + + # Exit KISS mode and re-enable rxlog + self._exit_kiss_mode() + time.sleep(0.2) + self._clear_buffer() + self._send_command("rxlog on") + time.sleep(0.1) + self._clear_buffer() + + except Exception as e: + logger.error(f"Transmit error: {e}") + # Try to recover + self._exit_kiss_mode() + self._send_command("rxlog on") + + def _encode_kiss_frame(self, data: bytes) -> bytes: + """Encode data as a KISS frame.""" + frame = bytearray([KISS_FEND, 0x00]) # FEND + Data command + + for byte in data: + if byte == KISS_FEND: + frame.extend([KISS_FESC, KISS_TFEND]) + elif byte == KISS_FESC: + frame.extend([KISS_FESC, KISS_TFESC]) + else: + frame.append(byte) + + frame.append(KISS_FEND) + return bytes(frame) + + # ========================================================================= + # Interface methods compatible with KissSerialWrapper / LoRaRadio + # ========================================================================= + + def set_rx_callback(self, callback: Callable[[bytes], None]): + """Set the RX callback function.""" + self.on_frame_received = callback + # Capture the current event loop for thread-safe callbacks + try: + self._loop = asyncio.get_running_loop() + logger.debug("RX callback set with event loop reference") + except RuntimeError: + # No loop running yet, will be set later + logger.debug("RX callback set (no event loop yet)") + + def set_event_loop(self, loop: asyncio.AbstractEventLoop): + """Explicitly set the event loop for thread-safe callbacks.""" + self._loop = loop + logger.debug("Event loop reference set explicitly") + + def send_frame(self, data: bytes) -> bool: + """Queue a frame for transmission.""" + if not self.is_connected: + return False + + with self.tx_lock: + if len(self.tx_queue) < 100: + self.tx_queue.append(data) + return True + + logger.warning("TX queue full") + return False + + async def send(self, data: bytes) -> None: + """Async send interface for pymc_core compatibility.""" + if not self.send_frame(data): + raise Exception("Failed to queue frame for transmission") + return None + + def begin(self): + """Initialize the radio (called by some interfaces).""" + if not self.connect(): + raise Exception("Failed to initialize MeshTNC") + + def get_last_rssi(self) -> float: + """Get last received RSSI.""" + return self.stats.get("last_rssi") or -999 + + def get_last_snr(self) -> float: + """Get last received SNR.""" + return self.stats.get("last_snr") or -999 + + def get_stats(self) -> Dict[str, Any]: + """Get interface statistics.""" + return self.stats.copy() + + def sleep(self): + """Put radio to sleep (not supported).""" + pass + + def __enter__(self): + self.connect() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.disconnect() diff --git a/modules/pymc_connection.py b/modules/pymc_connection.py index 7dd4c3e..1e8c714 100644 --- a/modules/pymc_connection.py +++ b/modules/pymc_connection.py @@ -30,6 +30,14 @@ except ImportError as e: KissSerialWrapper = None pymc_constants = None +# Import MeshTNC CLI mode wrapper (workaround for KISS RX bug) +try: + from modules.meshtnc_serial import MeshTNCSerial + MESHTNC_SERIAL_AVAILABLE = True +except ImportError: + MeshTNCSerial = None + MESHTNC_SERIAL_AVAILABLE = False + class EventType(IntEnum): """Event types compatible with meshcore EventType for bot compatibility.""" @@ -70,6 +78,230 @@ class ContactInfo: return getattr(self, key, default) +class ContactBookAdapter: + """ + Adapter to provide contact information from the bot's database to pymc_core. + + pymc_core's TextMessageHandler expects a contacts object with a 'contacts' + attribute that is iterable and contains contact objects with 'public_key'. + """ + + def __init__(self, bot): + self.bot = bot + self.logger = logging.getLogger("ContactBookAdapter") + self._contact_cache: List[Any] = [] + self._cache_time = 0 + self._cache_ttl = 30 # Refresh cache every 30 seconds + + @property + def contacts(self) -> List[Any]: + """Get contacts list (required by pymc_core's dispatcher).""" + return self._get_contacts() + + def _get_contacts(self) -> List[Any]: + """Get all contacts from the database.""" + import time as _time + + # Check if cache is still valid + now = _time.time() + if now - self._cache_time < self._cache_ttl and self._contact_cache: + return self._contact_cache + + contacts = [] + + try: + if not hasattr(self.bot, 'db_manager'): + self.logger.debug("Database manager not available for contacts") + return contacts + + # Query contacts from database + query = """ + SELECT public_key, name, role, device_type + FROM complete_contact_tracking + WHERE public_key IS NOT NULL AND public_key != '' + """ + + rows = self.bot.db_manager.execute_query(query) + + for row in rows: + # Create a simple object that pymc_core can use + contact = type('Contact', (), { + 'public_key': row.get('public_key', ''), + 'name': row.get('name', ''), + 'role': row.get('role', 'companion'), + })() + contacts.append(contact) + + self._contact_cache = contacts + self._cache_time = now + + self.logger.debug(f"Loaded {len(contacts)} contacts from database") + + except Exception as e: + self.logger.error(f"Error loading contacts: {e}") + + return contacts + + def get_contact_by_public_key(self, public_key: str): + """Find a contact by public key.""" + for contact in self.contacts: + if contact.public_key == public_key: + return contact + return None + + +class ChannelDatabaseAdapter: + """ + Adapter to provide channel information from the bot's database to pymc_core. + + pymc_core's GroupTextHandler expects a channel_db with a get_channels() method + that returns a list of dicts with 'name' and 'secret' keys. + """ + + # Well-known Public channel secret (from MeshCore spec) + PUBLIC_CHANNEL_SECRET = "d8ee687c9be53be08d24a7f7aede4dac5de3168dea03c12e7b9c96c5511e807f" + + # Common hashtag channels to always include + COMMON_HASHTAG_CHANNELS = ['#Public', '#general', '#test'] + + def __init__(self, bot): + self.bot = bot + self.logger = logging.getLogger("ChannelDatabaseAdapter") + self._channel_cache: List[Dict[str, Any]] = [] + self._cache_time = 0 + self._cache_ttl = 30 # Refresh cache every 30 seconds + + @staticmethod + def derive_hashtag_key(channel_name: str) -> str: + """ + Derive channel key from a hashtag channel name. + + For channels starting with #, the key is derived by SHA256 hashing + the channel name (including the #). + + Args: + channel_name: Channel name (e.g., "#howltest") + + Returns: + Hex string of the 32-byte derived key + """ + import hashlib + # Hash the channel name (including #) to get the key + key = hashlib.sha256(channel_name.encode('utf-8')).digest() + return key.hex() + + @staticmethod + def get_channel_hash(secret_hex: str) -> int: + """ + Get the channel hash from a secret (first byte of SHA256(secret)). + + This matches pymc_core's _derive_channel_hash method. + """ + import hashlib + try: + secret_bytes = bytes.fromhex(secret_hex) + except ValueError: + secret_bytes = secret_hex.encode('utf-8') + return hashlib.sha256(secret_bytes).digest()[0] + + def get_channels(self) -> List[Dict[str, Any]]: + """ + Get all channels with their secrets. + + Includes: + - Public channel (well-known key) + - Hashtag channels from config (derived keys) + - Custom channels from database + + Returns: + List of channel dicts with 'name' and 'secret' keys + """ + import time as _time + + # Check if cache is still valid + now = _time.time() + if now - self._cache_time < self._cache_ttl and self._channel_cache: + self.logger.debug(f"Returning cached {len(self._channel_cache)} channels") + return self._channel_cache + + channels = [] + self.logger.info("Building channel list...") + + # Always include the Public channel (well-known secret) + public_hash = self.get_channel_hash(self.PUBLIC_CHANNEL_SECRET) + channels.append({ + 'name': 'Public', + 'secret': self.PUBLIC_CHANNEL_SECRET, + 'idx': 0 + }) + self.logger.info(f"Added Public channel with hash 0x{public_hash:02X}") + + # Add common hashtag channels + for ch_name in self.COMMON_HASHTAG_CHANNELS: + derived_key = self.derive_hashtag_key(ch_name) + ch_hash = self.get_channel_hash(derived_key) + if not any(c['name'] == ch_name for c in channels): + channels.append({ + 'name': ch_name, + 'secret': derived_key, + 'idx': len(channels) + }) + self.logger.info(f"Added common channel {ch_name} with hash 0x{ch_hash:02X}") + + # Add hashtag channels from config + try: + if hasattr(self.bot, 'config'): + monitor_channels = self.bot.config.get('Channels', 'monitor_channels', fallback='') + if monitor_channels: + for ch_name in monitor_channels.split(','): + ch_name = ch_name.strip() + if ch_name.startswith('#') and not any(c['name'] == ch_name for c in channels): + # Derive key for hashtag channel + derived_key = self.derive_hashtag_key(ch_name) + ch_hash = self.get_channel_hash(derived_key) + channels.append({ + 'name': ch_name, + 'secret': derived_key, + 'idx': len(channels) + }) + self.logger.debug(f"Added config channel {ch_name} with hash 0x{ch_hash:02X}") + except Exception as e: + self.logger.warning(f"Error loading hashtag channels from config: {e}") + + # Add channels from database + try: + if hasattr(self.bot, 'db_manager'): + query = """ + SELECT channel_idx, channel_name, channel_key_hex + FROM channels + WHERE channel_key_hex IS NOT NULL AND channel_key_hex != '' + """ + + rows = self.bot.db_manager.execute_query(query) + + for row in rows: + channel_name = row.get('channel_name', '') + channel_key_hex = row.get('channel_key_hex', '') + + if channel_name and channel_key_hex: + # Don't duplicate if already added + if not any(c['name'] == channel_name for c in channels): + channels.append({ + 'name': channel_name, + 'secret': channel_key_hex, + 'idx': row.get('channel_idx', 0) + }) + except Exception as e: + self.logger.warning(f"Error loading channels from database: {e}") + + self._channel_cache = channels + self._cache_time = now + + self.logger.info(f"Loaded {len(channels)} channels (Public + {len(channels)-1} configured)") + + return channels + + class PyMCConnection: """ Wrapper around pyMC_core MeshNode providing a meshcore-compatible interface. @@ -170,41 +402,67 @@ class PyMCConnection: } self.logger.info(f"Radio config: freq={frequency/1e6:.3f}MHz, BW={bandwidth/1000}kHz, SF={spreading_factor}") - - # Create KISS serial wrapper - self._radio = KissSerialWrapper( - port=serial_port, - baudrate=baudrate, - radio_config=kiss_config, - auto_configure=True - ) - + + # Use MeshTNCSerial (CLI/rxlog mode) instead of KissSerialWrapper + # This is a workaround for the MeshTNC KISS mode RX bug where + # received packets are not forwarded to serial in KISS mode + if MESHTNC_SERIAL_AVAILABLE: + self.logger.info("Using MeshTNCSerial (CLI/rxlog mode) for RX") + self._radio = MeshTNCSerial( + port=serial_port, + baudrate=baudrate, + radio_config=kiss_config, + ) + else: + self.logger.info("Using KissSerialWrapper (KISS mode)") + self._radio = KissSerialWrapper( + port=serial_port, + baudrate=baudrate, + radio_config=kiss_config, + auto_configure=True + ) + # Connect to radio if not self._radio.connect(): self.logger.error(f"Failed to connect to MeshTNC on {serial_port}") return False - - self.logger.info("KISS radio connected successfully") - + + self.logger.info("Radio connected successfully") + # Get bot name from config bot_name = self.config.get('bot_name', 'MeshCoreBot') - - # Create mesh node + + # Create adapters for pymc_core + self._channel_db = ChannelDatabaseAdapter(self.bot) + self._contact_book = ContactBookAdapter(self.bot) + + # Create mesh node with channel database and contacts node_config = { "node": {"name": bot_name} } - + self._mesh_node = MeshNode( radio=self._radio, local_identity=self._identity, - config=node_config + config=node_config, + channel_db=self._channel_db, + contacts=self._contact_book ) - # Start the mesh node - await self._mesh_node.start() - - # Setup event handlers + # Setup event handlers BEFORE starting the node + # This ensures callbacks are registered before packets arrive self._setup_event_handlers() + + # Start the mesh node dispatcher as a background task + # (MeshNode.start() calls dispatcher.run_forever() which blocks) + self._dispatcher_task = asyncio.create_task(self._mesh_node.start()) + self.logger.info("Mesh node dispatcher started as background task") + + # Set the event loop on the radio for thread-safe callbacks + # This is needed because RX happens in a background thread + if hasattr(self._radio, 'set_event_loop'): + self._radio.set_event_loop(asyncio.get_running_loop()) + self.logger.info("Event loop set on radio for thread-safe callbacks") # Load contacts from database await self._load_contacts_from_database() @@ -231,21 +489,31 @@ class PyMCConnection: async def disconnect(self) -> None: """Disconnect from radio.""" self._connected = False - + + # Cancel the dispatcher task + if hasattr(self, '_dispatcher_task') and self._dispatcher_task: + if not self._dispatcher_task.done(): + self._dispatcher_task.cancel() + try: + await self._dispatcher_task + except asyncio.CancelledError: + pass + self._dispatcher_task = None + if self._mesh_node: try: - await self._mesh_node.stop() + self._mesh_node.stop() except Exception as e: self.logger.warning(f"Error stopping mesh node: {e}") self._mesh_node = None - + if self._radio: try: self._radio.disconnect() except Exception as e: self.logger.warning(f"Error disconnecting radio: {e}") self._radio = None - + self.logger.info("PyMC connection closed") def subscribe(self, event_type: EventType, callback: Callable) -> None: @@ -276,19 +544,22 @@ class PyMCConnection: self.logger.error(f"Error in event callback: {e}") def _setup_event_handlers(self) -> None: - """Setup handlers for pyMC_core events.""" + """Setup handlers for pyMC_core events via dispatcher callbacks.""" if not self._mesh_node: + self.logger.warning("Cannot setup event handlers - mesh node not initialized") return - - # Get the event service from mesh node - event_service = getattr(self._mesh_node, 'event_service', None) - if not event_service: - self.logger.warning("MeshNode does not have event_service") - return - - # Register handlers for different packet types - # pyMC_core uses a different event system, we need to translate - self.logger.info("Event handlers setup for pyMC_core") + + dispatcher = self._mesh_node.dispatcher + + # Register raw packet callback for logging and RAW_DATA events + dispatcher.set_raw_packet_callback(self._on_raw_packet) + self.logger.info("Registered raw packet callback with dispatcher") + + # Register packet received callback for processed packets + dispatcher.set_packet_received_callback(self._on_packet_received) + self.logger.info("Registered packet received callback with dispatcher") + + self.logger.info("Event handlers setup for pyMC_core dispatcher") async def _load_contacts_from_database(self) -> None: """Load contacts from the complete_contact_tracking table.""" @@ -364,140 +635,205 @@ class PyMCConnection: async def start_auto_message_fetching(self) -> None: """ - Start automatic message fetching loop. - - For pyMC_core, this starts the receive loop that processes - incoming packets from the KISS TNC. + Start automatic message fetching. + + For pyMC_core, message fetching is handled by the dispatcher's + callback system. This method ensures the dispatcher task is running. """ if not self._mesh_node: self.logger.warning("Cannot start message fetching - mesh node not initialized") return - - # Start background task for receiving packets - asyncio.create_task(self._receive_loop()) - self.logger.info("Auto message fetching started") - - async def _receive_loop(self) -> None: - """Background loop for receiving and processing packets.""" - self.logger.info("Starting receive loop") - - while self._connected and self._radio: - try: - # Check for incoming packets - if hasattr(self._radio, 'receive'): - packet = await asyncio.get_event_loop().run_in_executor( - None, self._radio.receive - ) - - if packet: - await self._process_incoming_packet(packet) - - # Small delay to prevent busy loop - await asyncio.sleep(0.01) - - except asyncio.CancelledError: - break - except Exception as e: - self.logger.error(f"Error in receive loop: {e}") - await asyncio.sleep(1) - - self.logger.info("Receive loop stopped") - - async def _process_incoming_packet(self, packet: bytes) -> None: + + # The dispatcher task was started in connect() + # Just verify it's still running + if hasattr(self, '_dispatcher_task') and self._dispatcher_task: + if self._dispatcher_task.done(): + # Check if it failed with an exception + try: + self._dispatcher_task.result() + except Exception as e: + self.logger.error(f"Dispatcher task failed: {e}") + # Restart it + self._dispatcher_task = asyncio.create_task(self._mesh_node.start()) + self.logger.info("Restarted dispatcher task") + + self.logger.info("Auto message fetching active (via dispatcher callbacks)") + + async def _on_raw_packet(self, pkt, data: bytes, analysis: dict = None) -> None: """ - Process an incoming packet and emit appropriate events. - - Args: - packet: Raw packet bytes from radio + Callback for raw packets from the dispatcher. + + This is called for EVERY packet received, before handler processing. + Emits RAW_DATA events for the bot's event system. """ try: - # Decode packet using pyMC_core - # This is a simplified version - full implementation would use - # pyMC_core's packet parsing - - if len(packet) < 2: - return - - # Extract header byte - header = packet[0] - route_type = header & 0x03 - payload_type = (header >> 2) & 0x0F - - # Calculate SNR/RSSI if available from KISS metadata - snr = 0 - rssi = -120 - - # Emit raw data event + # Get signal strength from packet or radio + snr = getattr(pkt, '_snr', 0) or (self._radio.get_last_snr() if self._radio else 0) + rssi = getattr(pkt, '_rssi', -120) or (self._radio.get_last_rssi() if self._radio else -120) + + self.logger.debug(f"Raw packet received: {len(data)} bytes, SNR={snr}, RSSI={rssi}") + + # Emit raw data event (with 'data' field for bot compatibility) await self._emit_event( EventType.RAW_DATA, { - 'raw_hex': packet.hex(), + 'data': data.hex(), # Bot expects 'data' field + 'raw_hex': data.hex(), 'SNR': snr, 'RSSI': rssi, 'timestamp': time.time() } ) - - # Parse based on payload type - if payload_type == 0x01: # TXT_MSG - await self._handle_text_message(packet, snr, rssi) - elif payload_type == 0x04: # ADVERT - await self._handle_advertisement(packet, snr, rssi) - + + # Also emit RX_LOG_DATA for signal metrics + await self._emit_event( + EventType.RX_LOG_DATA, + { + 'SNR': snr, + 'RSSI': rssi, + 'timestamp': time.time() + } + ) + except Exception as e: - self.logger.error(f"Error processing packet: {e}") - - async def _handle_text_message(self, packet: bytes, snr: float, rssi: float) -> None: - """Handle incoming text message packet.""" + self.logger.error(f"Error in raw packet callback: {e}") + + async def _on_packet_received(self, pkt) -> None: + """ + Callback for processed packets from the dispatcher. + + This is called after the dispatcher has parsed and handled the packet. + We translate pymc_core packets to our event system. + """ try: - # Parse message packet - # This is simplified - full implementation needs proper MeshCore parsing - - header = packet[0] - route_type = header & 0x03 - - # Skip header and extract path/payload - # Format depends on route type - - payload = { - 'text': '', # Will be decrypted - 'pubkey_prefix': '', + # Import constants for payload types + from pymc_core.protocol.constants import ( + PAYLOAD_TYPE_ADVERT, + PAYLOAD_TYPE_ACK, + PAYLOAD_TYPE_TXT_MSG, + PAYLOAD_TYPE_GRP_TXT, + PH_TYPE_SHIFT, + ) + + payload_type = pkt.header >> PH_TYPE_SHIFT + snr = getattr(pkt, '_snr', 0) + rssi = getattr(pkt, '_rssi', -120) + + self.logger.info(f"Packet received: type={payload_type}, SNR={snr}, RSSI={rssi}") + + if payload_type == PAYLOAD_TYPE_ADVERT: + await self._handle_advert_packet(pkt, snr, rssi) + elif payload_type == PAYLOAD_TYPE_TXT_MSG: + await self._handle_text_packet(pkt, snr, rssi) + elif payload_type == PAYLOAD_TYPE_GRP_TXT: + await self._handle_group_text_packet(pkt, snr, rssi) + # ACKs are handled by the dispatcher internally + + except Exception as e: + self.logger.error(f"Error in packet received callback: {e}") + import traceback + self.logger.error(traceback.format_exc()) + + async def _handle_advert_packet(self, pkt, snr: float, rssi: float) -> None: + """Handle advertisement packet from dispatcher.""" + try: + # Extract advertisement info from packet + # The advert handler in pymc_core parses this + payload = pkt.payload if hasattr(pkt, 'payload') else b'' + + # Basic info we can extract + event_payload = { + 'public_key': payload[2:34].hex() if len(payload) >= 34 else '', + 'name': '', # Name is in the advert data after pubkey + 'latitude': None, + 'longitude': None, + 'flags': payload[1] if len(payload) > 1 else 0, + 'SNR': snr, + 'RSSI': rssi, + 'raw_hex': payload.hex() if payload else '' + } + + # Try to extract name from advert + if len(payload) > 34: + try: + name_end = payload.find(b'\x00', 34) + if name_end > 34: + event_payload['name'] = payload[34:name_end].decode('utf-8', errors='ignore') + except Exception: + pass + + self.logger.info(f"Advertisement from {event_payload.get('name', 'unknown')}") + + # Emit new contact event + await self._emit_event(EventType.NEW_CONTACT, event_payload) + await self._emit_event(EventType.ADVERT_RECV, event_payload) + + except Exception as e: + self.logger.error(f"Error handling advertisement: {e}") + + async def _handle_text_packet(self, pkt, snr: float, rssi: float) -> None: + """Handle text message packet from dispatcher.""" + try: + # The text handler in pymc_core decrypts and parses the message + # We need to extract the relevant info + payload = pkt.payload if hasattr(pkt, 'payload') else b'' + + # Get source hash from payload + src_hash = payload[1] if len(payload) > 1 else 0 + + event_payload = { + 'text': '', # Will need decryption via handler + 'pubkey_prefix': f'{src_hash:02x}', 'SNR': snr, 'RSSI': rssi, 'path_len': 0, - 'raw_hex': packet.hex() + 'raw_hex': payload.hex() if payload else '' } - - # Emit as DM or channel message based on route type - # For now, emit as contact message - await self._emit_event(EventType.CONTACT_MSG_RECV, payload) - + + self.logger.info(f"Text message from {event_payload['pubkey_prefix']}") + + # Emit contact message event + await self._emit_event(EventType.CONTACT_MSG_RECV, event_payload) + except Exception as e: self.logger.error(f"Error handling text message: {e}") - - async def _handle_advertisement(self, packet: bytes, snr: float, rssi: float) -> None: - """Handle incoming advertisement packet.""" + + async def _handle_group_text_packet(self, pkt, snr: float, rssi: float) -> None: + """Handle group/channel text message packet from dispatcher.""" try: - # Parse advertisement packet - # Advertisements contain node identity, location, flags - - payload = { - 'public_key': '', - 'name': '', - 'latitude': None, - 'longitude': None, - 'flags': 0, + payload = pkt.payload if hasattr(pkt, 'payload') else b'' + + # The GroupTextHandler in pymc_core stores decrypted data in pkt.decrypted + group_data = getattr(pkt, 'decrypted', {}).get('group_text_data', {}) + + text = group_data.get('text', '') + channel_name = group_data.get('channel_name', '') + sender_name = group_data.get('sender_name', 'Unknown') + channel_hash = group_data.get('channel_hash', 0) + + # If group_data is empty, the message wasn't decrypted (missing channel key) + if not group_data: + self.logger.warning(f"Channel message not decrypted - missing channel key? hash=0x{payload[0]:02X}") + + event_payload = { + 'text': text, + 'channel': channel_name, + 'channel_name': channel_name, + 'channel_idx': channel_hash, # Use hash as index for compatibility + 'sender': sender_name, + 'sender_name': sender_name, 'SNR': snr, 'RSSI': rssi, - 'raw_hex': packet.hex() + 'raw_hex': payload.hex() if payload else '' } - - # Emit new contact event - await self._emit_event(EventType.NEW_CONTACT, payload) - await self._emit_event(EventType.ADVERT_RECV, payload) - + + self.logger.info(f"Channel message on [{channel_name}] from {sender_name}: {text[:50]}...") + + # Emit channel message event + await self._emit_event(EventType.CHANNEL_MSG_RECV, event_payload) + except Exception as e: - self.logger.error(f"Error handling advertisement: {e}") + self.logger.error(f"Error handling group text: {e}") class PyMCCommands: @@ -624,31 +960,51 @@ class PyMCCommands: contacts = list(self.connection._contacts.values()) return Event(type=EventType.OK, payload={'contacts': contacts}) - async def add_contact(self, public_key: str, name: str) -> Event: + async def add_contact(self, public_key_or_data, name: str = None) -> Event: """ Add a contact to the database. - - Args: - public_key: Contact's public key - name: Contact's name - + + Supports multiple calling conventions: + add_contact(contact_data_dict) + add_contact(public_key, name) + add_contact(name, public_key) + Returns: Event indicating success or failure """ try: - # Add to database via repeater manager - if hasattr(self.connection.bot, 'repeater_manager'): - # This will be tracked via advertisement handling - pass - + # Handle dict argument (contact_data) + if isinstance(public_key_or_data, dict): + contact_data = public_key_or_data + public_key = contact_data.get('public_key', '') + name = contact_data.get('name', contact_data.get('adv_name', '')) + elif name is None: + # Single string argument - assume it's a name + name = public_key_or_data + public_key = '' + else: + # Two arguments - could be (key, name) or (name, key) + if len(public_key_or_data) == 64: + # Looks like a public key (64 hex chars) + public_key = public_key_or_data + else: + # Assume first is name, second is key + public_key = name + name = public_key_or_data + + if not name: + name = f"Contact_{public_key[:8]}" if public_key else "Unknown" + # Add to local cache contact = ContactInfo(public_key=public_key, name=name) - key = public_key[:12] + key = public_key[:12] if public_key else name self.connection._contacts[key] = contact - + + self.logger.info(f"Added contact: {name} ({public_key[:12] if public_key else 'no key'})") return Event(type=EventType.OK, payload={'added': True}) - + except Exception as e: + self.logger.error(f"Error adding contact: {e}") return Event(type=EventType.ERROR, payload={'error': str(e)}) async def remove_contact(self, public_key: str) -> Event: @@ -667,6 +1023,87 @@ class PyMCCommands: del self.connection._contacts[key] return Event(type=EventType.OK, payload={'removed': True}) - + except Exception as e: return Event(type=EventType.ERROR, payload={'error': str(e)}) + + async def get_channel(self, channel_idx: int) -> Event: + """ + Get channel information from database. + + Args: + channel_idx: Channel index + + Returns: + Event with channel info + """ + try: + if not hasattr(self.connection.bot, 'db_manager'): + return Event(type=EventType.ERROR, payload={'error': 'Database not available'}) + + query = """ + SELECT channel_idx, channel_name, channel_type, channel_key_hex + FROM channels + WHERE channel_idx = ? + """ + rows = self.connection.bot.db_manager.execute_query(query, (channel_idx,)) + + if rows: + row = rows[0] + return Event(type=EventType.OK, payload={ + 'channel_idx': row.get('channel_idx'), + 'channel_name': row.get('channel_name', ''), + 'channel_type': row.get('channel_type', ''), + 'channel_key_hex': row.get('channel_key_hex', ''), + 'channel_secret': bytes.fromhex(row.get('channel_key_hex', '')) if row.get('channel_key_hex') else b'' + }) + else: + return Event(type=EventType.ERROR, payload={'error': f'Channel {channel_idx} not found'}) + + except Exception as e: + self.logger.error(f"Error getting channel: {e}") + return Event(type=EventType.ERROR, payload={'error': str(e)}) + + async def set_channel(self, channel_idx: int, channel_name: str, channel_secret) -> Event: + """ + Set/create a channel in the database. + + Args: + channel_idx: Channel index + channel_name: Channel name + channel_secret: Channel key (bytes or hex string) + + Returns: + Event indicating success or failure + """ + try: + if not hasattr(self.connection.bot, 'db_manager'): + return Event(type=EventType.ERROR, payload={'error': 'Database not available'}) + + # Convert secret to hex string if needed + if isinstance(channel_secret, bytes): + channel_key_hex = channel_secret.hex() + else: + channel_key_hex = str(channel_secret) + + # Upsert channel in database + query = """ + INSERT OR REPLACE INTO channels (channel_idx, channel_name, channel_key_hex, last_updated) + VALUES (?, ?, ?, CURRENT_TIMESTAMP) + """ + self.connection.bot.db_manager.execute_query(query, (channel_idx, channel_name, channel_key_hex)) + + # Invalidate channel cache + if hasattr(self.connection, '_channel_db'): + self.connection._channel_db._cache_time = 0 + + self.logger.info(f"Set channel {channel_idx}: {channel_name}") + return Event(type=EventType.OK, payload={ + 'channel_idx': channel_idx, + 'channel_name': channel_name, + 'channel_key_hex': channel_key_hex + }) + + except Exception as e: + self.logger.error(f"Error setting channel: {e}") + return Event(type=EventType.ERROR, payload={'error': str(e)})