diff --git a/docs/path-command-config.md b/docs/path-command-config.md index 274901f..36a188a 100644 --- a/docs/path-command-config.md +++ b/docs/path-command-config.md @@ -2,6 +2,15 @@ This document explains all configuration parameters for the Path Command, which decodes hex path data to identify repeaters in message routing paths. +## Multi-byte path support + +The path command supports **1-, 2-, and 3-byte-per-hop** paths (2, 4, or 6 hex characters per node). + +- **`path` with no arguments**: Uses the current message’s decoded path when available (from routing info). No re-parsing; node list and hop size come from the packet. +- **`path ` with arguments**: + - **Comma-separated** (e.g. `path 0102,5f7e`): Hop size is inferred from token length. All tokens must be the same length (2, 4, or 6 hex chars). Example: `0102,5f7e` → two 2-byte hops. + - **Continuous hex** (e.g. `path 01025f7e`): The bot’s [Bot] **`prefix_bytes`** is used (2 hex chars = 1 byte, 4 = 2 bytes, 6 = 3 bytes). Use comma-separated input to force a multi-byte interpretation when the bot is in 1-byte mode. + ## Quick Start: Presets The Path Command supports three presets that configure multiple related settings: diff --git a/modules/commands/base_command.py b/modules/commands/base_command.py index 0a84d10..5526258 100644 --- a/modules/commands/base_command.py +++ b/modules/commands/base_command.py @@ -825,25 +825,32 @@ class BaseCommand(ABC): """Check if this command can execute right now (permissions, cooldown, etc.)""" return self.can_execute(message) + def get_path_display_string(self, message: MeshMessage) -> str: + """Get path string for display (test/ack placeholders). Prefers message.routing_info for multi-byte and direct.""" + routing_info = getattr(message, 'routing_info', None) + if routing_info is not None: + path_length = routing_info.get('path_length', 0) + if path_length == 0: + return "Direct" + path_nodes = routing_info.get('path_nodes', []) + if path_nodes: + path_str = ','.join(str(n).lower() for n in path_nodes) + return f"{path_str} ({len(path_nodes)} hops)" + if not message.path: + return "Unknown" + path_string = message.path + if " via ROUTE_TYPE_" in path_string: + path_string = path_string.split(" via ROUTE_TYPE_")[0] + return path_string.strip() or "Unknown" + def build_enhanced_connection_info(self, message: MeshMessage) -> str: - """Build enhanced connection info with SNR, RSSI, and parsed route information""" - # Extract just the hops and path info without the route type - routing_info = message.path or "Unknown routing" - - # Clean up the routing info to remove the "via ROUTE_TYPE_*" part - if "via ROUTE_TYPE_" in routing_info: - # Extract just the hops and path part - parts = routing_info.split(" via ROUTE_TYPE_") - if len(parts) > 0: - routing_info = parts[0] - - # Add SNR and RSSI + """Build enhanced connection info with SNR, RSSI, and parsed route information. + Uses message.routing_info when present (multi-byte path, direct) for path part. + """ + path_part = self.get_path_display_string(message) snr_info = f"SNR: {message.snr or 'Unknown'} dB" rssi_info = f"RSSI: {message.rssi or 'Unknown'} dBm" - - # Build enhanced connection info - connection_info = f"{routing_info} | {snr_info} | {rssi_info}" - + connection_info = f"{path_part} | {snr_info} | {rssi_info}" return connection_info def format_timestamp(self, message: MeshMessage) -> str: diff --git a/modules/commands/multitest_command.py b/modules/commands/multitest_command.py index 8782751..1454f52 100644 --- a/modules/commands/multitest_command.py +++ b/modules/commands/multitest_command.py @@ -5,12 +5,13 @@ Listens for a period of time and collects all unique paths from incoming message """ import asyncio +import re import time from typing import Set, Optional, Dict from dataclasses import dataclass from .base_command import BaseCommand from ..models import MeshMessage -from ..utils import calculate_packet_hash +from ..utils import calculate_packet_hash, parse_path_string @dataclass @@ -113,80 +114,109 @@ class MultitestCommand(BaseCommand): return False def extract_path_from_rf_data(self, rf_data: dict) -> Optional[str]: - """Extract path in prefix string format from RF data routing_info""" + """Extract path in prefix string format from RF data routing_info. + Supports 1-, 2-, and 3-byte-per-hop (2, 4, or 6 hex chars per node). + """ try: routing_info = rf_data.get('routing_info') if not routing_info: return None - + path_nodes = routing_info.get('path_nodes', []) if not path_nodes: - # Try to extract from path_hex if path_nodes not available + # Fallback: build from path_hex using bytes_per_hop from packet path_hex = routing_info.get('path_hex', '') if path_hex: - n = getattr(self.bot, 'prefix_hex_chars', 2) + bytes_per_hop = routing_info.get('bytes_per_hop') + n = (bytes_per_hop * 2) if bytes_per_hop and bytes_per_hop >= 1 else getattr(self.bot, 'prefix_hex_chars', 2) if n <= 0: n = 2 - path_nodes = [path_hex[i:i+n] for i in range(0, len(path_hex), n)] + path_nodes = [path_hex[i:i + n] for i in range(0, len(path_hex), n)] if (len(path_hex) % n) != 0: - path_nodes = [path_hex[i:i+2] for i in range(0, len(path_hex), 2)] - + path_nodes = [path_hex[i:i + 2] for i in range(0, len(path_hex), 2)] + if path_nodes: - # Validate and format path nodes - valid_len = getattr(self.bot, 'prefix_hex_chars', 2) - if valid_len <= 0: - valid_len = 2 + # Validate: each node 2, 4, or 6 hex chars valid_parts = [] for node in path_nodes: node_str = str(node).lower().strip() - if len(node_str) in (2, valid_len) and all(c in '0123456789abcdef' for c in node_str): + if len(node_str) in (2, 4, 6) and all(c in '0123456789abcdef' for c in node_str): valid_parts.append(node_str) - if valid_parts: return ','.join(valid_parts) - return None except Exception as e: self.logger.debug(f"Error extracting path from RF data: {e}") return None def extract_path_from_message(self, message: MeshMessage) -> Optional[str]: - """Extract path in prefix string format from a message""" + """Extract path in prefix string format from a message. + Prefers message.routing_info.path_nodes when present (multi-byte). + When routing_info has bytes_per_hop, uses it instead of inferring hop size. + Otherwise parses message.path: comma-separated tokens infer (2/4/6 hex); + continuous hex uses bot.prefix_hex_chars via parse_path_string(). + """ + routing_info = getattr(message, 'routing_info', None) + bytes_per_hop = routing_info.get('bytes_per_hop') if routing_info else None + hex_chars_per_node = (bytes_per_hop * 2) if (bytes_per_hop and bytes_per_hop >= 1) else None + + # Prefer routing_info when present (same as path command; no re-parse) + if routing_info is not None: + if routing_info.get('path_length', 0) == 0: + return None + path_nodes = routing_info.get('path_nodes', []) + if not path_nodes and hex_chars_per_node: + # Build from path_hex when path_nodes missing but bytes_per_hop known + path_hex = routing_info.get('path_hex', '') + if path_hex: + n = hex_chars_per_node + path_nodes = [path_hex[i:i + n] for i in range(0, len(path_hex), n)] + if (len(path_hex) % n) != 0: + path_nodes = [path_hex[i:i + 2] for i in range(0, len(path_hex), 2)] + if path_nodes: + valid = [str(n).lower().strip() for n in path_nodes + if len(str(n).strip()) in (2, 4, 6) and all(c in '0123456789abcdef' for c in str(n).lower().strip())] + if valid: + return ','.join(valid) if not message.path: return None - - # Check if it's a direct connection if "Direct" in message.path or "0 hops" in message.path: return None - - # Try to extract path nodes from the path string - # Path strings are typically in format: "node1,node2,node3 via ROUTE_TYPE_*" - # or just "node1,node2,node3" + path_string = message.path - - # Remove route type suffix if present if " via ROUTE_TYPE_" in path_string: path_string = path_string.split(" via ROUTE_TYPE_")[0] - - # Check if it looks like a comma-separated path + path_string = re.sub(r'\s*\([^)]*hops?[^)]*\)', '', path_string, flags=re.IGNORECASE).strip() + + # When bytes_per_hop is known, use it; otherwise infer from commas or use bot.prefix_hex_chars + expected_n = hex_chars_per_node or getattr(self.bot, 'prefix_hex_chars', 2) + if expected_n <= 0: + expected_n = 2 + if ',' in path_string: - # Clean up any extra info (like hop counts in parentheses) - # Example: "01,7e,55,86 (4 hops)" -> "01,7e,55,86" - if '(' in path_string: - path_string = path_string.split('(')[0].strip() - - # Validate that all parts are 2-character hex values - parts = path_string.split(',') - valid_parts = [] - for part in parts: - part = part.strip() - # Check if it's a 2-character hex value - if len(part) == 2 and all(c in '0123456789abcdefABCDEF' for c in part): - valid_parts.append(part.lower()) - - if valid_parts: - return ','.join(valid_parts) - + tokens = [t.strip() for t in path_string.split(',') if t.strip()] + if tokens: + if hex_chars_per_node: + # Use known hop size: all tokens must be that length + valid_hex = all( + len(t) == expected_n and all(c in '0123456789aAbBcCdDeEfF' for c in t) + for t in tokens + ) + if valid_hex: + return ','.join(t.lower() for t in tokens) + else: + # Infer from token length (2, 4, or 6, all same) + lengths = {len(t) for t in tokens} + valid_hex = all( + len(t) in (2, 4, 6) and all(c in '0123456789aAbBcCdDeEfF' for c in t) + for t in tokens + ) + if valid_hex and len(lengths) == 1 and next(iter(lengths)) in (2, 4, 6): + return ','.join(t.lower() for t in tokens) + # Continuous hex: use bytes_per_hop when known, else bot.prefix_hex_chars + node_ids = parse_path_string(path_string, prefix_hex_chars=expected_n) + if node_ids: + return ','.join(n.lower() for n in node_ids) return None def get_rf_data_for_message(self, message: MeshMessage) -> Optional[dict]: diff --git a/modules/commands/path_command.py b/modules/commands/path_command.py index 7f67519..2c0b52d 100644 --- a/modules/commands/path_command.py +++ b/modules/commands/path_command.py @@ -10,7 +10,7 @@ import asyncio from typing import List, Optional, Dict, Any, Tuple, Callable from .base_command import BaseCommand from ..models import MeshMessage -from ..utils import calculate_distance +from ..utils import calculate_distance, parse_path_string class PathCommand(BaseCommand): @@ -217,39 +217,39 @@ class PathCommand(BaseCommand): return True async def _decode_path(self, path_input: str) -> str: - """Decode hex path data to repeater names""" + """Decode hex path data to repeater names. + Comma-separated tokens infer hop size (2, 4, or 6 hex chars per node). + Otherwise uses bot.prefix_hex_chars via parse_path_string(). + """ try: - # Parse the path input - handle various formats - # Examples: "11,98,a4,49,cd,5f,01" or "11 98 a4 49 cd 5f 01" or "1198a449cd5f01" - path_input = path_input.replace(',', ' ').replace(':', ' ') - - # Extract hex values using regex - # Try configured width first - n = getattr(self.bot, "prefix_hex_chars", 2) - hex_pattern = rf'[0-9a-fA-F]{{{n}}}' - hex_matches = re.findall(hex_pattern, path_input) + # Strip hop-count suffix if present (e.g. "01,5f (2 hops)") + path_input = re.sub(r'\s*\([^)]*hops?[^)]*\)', '', path_input, flags=re.IGNORECASE) + path_input = path_input.strip() - # Backward compatibility: - # if no matches and we're expecting >2 chars, try legacy 2-char paths - if not hex_matches and n > 2: - legacy_pattern = r'[0-9a-fA-F]{2}' - hex_matches = re.findall(legacy_pattern, path_input) - - if not hex_matches: + node_ids = None + # Comma-separated: infer hex chars per node from token length (2, 4, or 6) + if ',' in path_input: + tokens = [t.strip() for t in path_input.split(',') if t.strip()] + if tokens: + lengths = {len(t) for t in tokens} + valid_hex = all( + len(t) in (2, 4, 6) and all(c in '0123456789aAbBcCdDeEfF' for c in t) + for t in tokens + ) + if valid_hex and len(lengths) == 1 and next(iter(lengths)) in (2, 4, 6): + node_ids = [t.upper() for t in tokens] + + if node_ids is None: + prefix_hex_chars = getattr(self.bot, 'prefix_hex_chars', 2) + node_ids = parse_path_string(path_input, prefix_hex_chars=prefix_hex_chars) + + if not node_ids: return self.translate('commands.path.no_valid_hex') - - # Convert to uppercase for consistency - # hex_matches preserves the order from the original path - node_ids = [match.upper() for match in hex_matches] - + self.logger.info(f"Decoding path with {len(node_ids)} nodes: {','.join(node_ids)}") - - # Look up repeater names for each node ID (order preserved) repeater_info = await self._lookup_repeater_names(node_ids) - - # Format the response return self._format_path_response(node_ids, repeater_info) - + except Exception as e: self.logger.error(f"Error decoding path: {e}") return self.translate('commands.path.error_decoding', error=str(e)) @@ -1418,24 +1418,28 @@ class PathCommand(BaseCommand): if stored_paths: # Build the path we're decoding (full path context) decoded_path_hex = ''.join([node.lower() for node in path_context]) - + path_n = len(path_context[0]) if path_context else 0 # hex chars per node in current path + # Check if any stored path shares common segments with decoded path - # This is useful for prefix collision resolution for stored_path in stored_paths: stored_hex = stored_path.get('path_hex', '').lower() obs_count = stored_path.get('observation_count', 1) - - if stored_hex: - # Chunk using stored bytes_per_hop (multi-byte path support) - n = (stored_path.get('bytes_per_hop') or 1) * 2 - if n <= 0: - n = 2 - stored_nodes = [stored_hex[i:i+n] for i in range(0, len(stored_hex), n)] - if (len(stored_hex) % n) != 0: - stored_nodes = [stored_hex[i:i+2] for i in range(0, len(stored_hex), 2)] - decoded_nodes = [decoded_path_hex[i:i+n] for i in range(0, len(decoded_path_hex), n)] - if (len(decoded_path_hex) % n) != 0: - decoded_nodes = [decoded_path_hex[i:i+2] for i in range(0, len(decoded_path_hex), 2)] + + if not stored_hex: + continue + # Chunk stored path by its bytes_per_hop + stored_n = (stored_path.get('bytes_per_hop') or 1) * 2 + if stored_n <= 0: + stored_n = 2 + # Only compare when same hop size (otherwise 1-byte vs 2-byte would mismatch) + if path_n != stored_n: + continue + stored_nodes = [stored_hex[i:i+stored_n] for i in range(0, len(stored_hex), stored_n)] + if (len(stored_hex) % stored_n) != 0: + stored_nodes = [stored_hex[i:i+2] for i in range(0, len(stored_hex), 2)] + decoded_nodes = [decoded_path_hex[i:i+path_n] for i in range(0, len(decoded_path_hex), path_n)] + if (len(decoded_path_hex) % path_n) != 0: + decoded_nodes = [decoded_path_hex[i:i+2] for i in range(0, len(decoded_path_hex), 2)] # Count how many nodes appear in both paths (in order) common_segments = 0 @@ -1717,42 +1721,48 @@ class PathCommand(BaseCommand): await self.send_response(message, current_message, skip_user_rate_limit=True) async def _extract_path_from_recent_messages(self) -> str: - """Extract path from the current message's path information (same as test command)""" + """Extract path from the current message's path information (same as test command). + Prefers already-extracted routing_info.path_nodes when present (multi-byte path support). + """ try: - # Use the path information from the current message being processed - # This is the same reliable source that the test command uses - if hasattr(self, '_current_message') and self._current_message and self._current_message.path: - path_string = self._current_message.path - - # Check if it's a direct connection - if "Direct" in path_string or "0 hops" in path_string: - return self.translate('commands.path.direct_connection') - - # Try to extract path nodes from the path string - # Path strings are typically in format: "node1,node2,node3 via ROUTE_TYPE_*" - if " via ROUTE_TYPE_" in path_string: - # Extract just the path part before the route type - path_part = path_string.split(" via ROUTE_TYPE_")[0] - else: - path_part = path_string - - # Check if it looks like a comma-separated path - if ',' in path_part: - path_input = path_part - return await self._decode_path(path_input) - else: - # Try to decode even single nodes (e.g., "01" should be decoded to a repeater name) - # Check if path_part looks like it contains hex values - hex_pattern = rf'[0-9a-fA-F]{{{self.bot.prefix_hex_chars}}}' - if re.search(hex_pattern, path_part): - # Looks like hex values, try to decode - return await self._decode_path(path_part) - else: - # Unknown format - just show the raw path string - return self.translate('commands.path.path_prefix', path_string=path_string) - else: + if not hasattr(self, '_current_message') or not self._current_message: return self.translate('commands.path.no_path') - + + msg = self._current_message + + # Prefer routing_info when present (no re-parsing; preserves bytes_per_hop) + routing_info = getattr(msg, 'routing_info', None) + if routing_info is not None: + path_length = routing_info.get('path_length', 0) + if path_length == 0: + return self.translate('commands.path.direct_connection') + path_nodes = routing_info.get('path_nodes', []) + if path_nodes: + node_ids = [n.upper() for n in path_nodes] + self.logger.info(f"Decoding path from routing_info with {len(node_ids)} nodes: {','.join(node_ids)}") + repeater_info = await self._lookup_repeater_names(node_ids) + return self._format_path_response(node_ids, repeater_info) + + # Fallback: parse message.path string (e.g. no routing_info or legacy path) + if not msg.path: + return self.translate('commands.path.no_path') + + path_string = msg.path + if "Direct" in path_string or "0 hops" in path_string: + return self.translate('commands.path.direct_connection') + + if " via ROUTE_TYPE_" in path_string: + path_part = path_string.split(" via ROUTE_TYPE_")[0] + else: + path_part = path_string + + if ',' in path_part: + return await self._decode_path(path_part) + hex_pattern = rf'[0-9a-fA-F]{{{getattr(self.bot, "prefix_hex_chars", 2)}}}' + if re.search(hex_pattern, path_part): + return await self._decode_path(path_part) + return self.translate('commands.path.path_prefix', path_string=path_string) + except Exception as e: self.logger.error(f"Error extracting path from current message: {e}") return self.translate('commands.path.error_extracting', error=str(e)) diff --git a/modules/commands/test_command.py b/modules/commands/test_command.py index c763f9a..0812873 100644 --- a/modules/commands/test_command.py +++ b/modules/commands/test_command.py @@ -145,48 +145,30 @@ class TestCommand(BaseCommand): return None def _extract_path_node_ids(self, message: MeshMessage) -> List[str]: - """Extract path node IDs from message path string. + """Extract path node IDs from message. Prefers routing_info.path_nodes (multi-byte); else parses message.path. - Args: - message: The message object containing the path. - Returns: - List[str]: List of valid 2-character hex node IDs. + List[str]: List of node IDs (2, 4, or 6 hex chars per node, uppercase). """ - if not message.path: + routing_info = getattr(message, 'routing_info', None) + if routing_info is not None and routing_info.get('path_length', 0) == 0: return [] - - # Check if it's a direct connection - if "Direct" in message.path or "0 hops" in message.path: + if routing_info and routing_info.get('path_nodes'): + return [str(n).upper().strip() for n in routing_info['path_nodes']] + if not message.path or "Direct" in message.path or "0 hops" in message.path: return [] - - # Extract path nodes from the path string - # Path strings are typically in format: "node1,node2,node3 via ROUTE_TYPE_*" - # or just "node1,node2,node3" path_string = message.path - - # Remove route type suffix if present if " via ROUTE_TYPE_" in path_string: path_string = path_string.split(" via ROUTE_TYPE_")[0] - - # Check if it looks like a comma-separated path + if '(' in path_string: + path_string = path_string.split('(')[0].strip() if ',' in path_string: - # Clean up any extra info (like hop counts in parentheses) - # Example: "01,7e,55,86 (4 hops)" -> "01,7e,55,86" - if '(' in path_string: - path_string = path_string.split('(')[0].strip() - - # Validate that all parts are 2-character hex values - parts = path_string.split(',') - valid_parts = [] - for part in parts: - part = part.strip() - # Check if it's a 2-character hex value - if len(part) == 2 and all(c in '0123456789abcdefABCDEF' for c in part): - valid_parts.append(part.upper()) - - return valid_parts - + parts = [p.strip() for p in path_string.split(',') if p.strip()] + if parts and all( + len(p) in (2, 4, 6) and all(c in '0123456789aAbBcCdDeEfF' for c in p) + for p in parts + ): + return [p.upper() for p in parts] return [] @@ -576,8 +558,14 @@ class TestCommand(BaseCommand): """ node_ids = self._extract_path_node_ids(message) if len(node_ids) < 2: - # Check if it's a direct connection - if not message.path or "Direct" in message.path or "0 hops" in message.path: + routing_info = getattr(message, 'routing_info', None) + is_direct = ( + (routing_info is not None and routing_info.get('path_length', 0) == 0) + or not message.path + or "Direct" in (message.path or "") + or "0 hops" in (message.path or "") + ) + if is_direct: return "N/A" # Direct connection, no path to calculate return "" # Path exists but insufficient nodes @@ -628,8 +616,14 @@ class TestCommand(BaseCommand): """ node_ids = self._extract_path_node_ids(message) if len(node_ids) < 2: - # Check if it's a direct connection - if not message.path or "Direct" in message.path or "0 hops" in message.path: + routing_info = getattr(message, 'routing_info', None) + is_direct = ( + (routing_info is not None and routing_info.get('path_length', 0) == 0) + or not message.path + or "Direct" in (message.path or "") + or "0 hops" in (message.path or "") + ) + if is_direct: return "N/A" # Direct connection, no path to calculate return "" # Path exists but insufficient nodes @@ -686,20 +680,35 @@ class TestCommand(BaseCommand): connection_info = self.build_enhanced_connection_info(message) timestamp = self.format_timestamp(message) elapsed = self.format_elapsed(message) - - # Calculate distance placeholders + path_display = self.get_path_display_string(message) + # Hops: from message.hops, or routing_info.path_length, or len(path_nodes) + routing_info = getattr(message, 'routing_info', None) + if getattr(message, 'hops', None) is not None: + hops_val = message.hops + elif routing_info is not None: + hops_val = routing_info.get('path_length') + if hops_val is None and routing_info.get('path_nodes'): + hops_val = len(routing_info['path_nodes']) + else: + hops_val = None + hops_str = str(hops_val) if hops_val is not None else "?" + if hops_val is None: + hops_label = "?" + elif hops_val == 1: + hops_label = "1 hop" + else: + hops_label = f"{hops_val} hops" path_distance = self._calculate_path_distance(message) firstlast_distance = self._calculate_firstlast_distance(message) - - # Format phrase part - add colon and space if phrase exists phrase_part = f": {phrase}" if phrase else "" - return response_format.format( sender=message.sender_id or self.translate('common.unknown_sender'), phrase=phrase, phrase_part=phrase_part, connection_info=connection_info, - path=message.path or self.translate('common.unknown_path'), + path=path_display, + hops=hops_str, + hops_label=hops_label, timestamp=timestamp, elapsed=elapsed, snr=message.snr or self.translate('common.unknown'), diff --git a/modules/message_handler.py b/modules/message_handler.py index 7655ab1..1aa4cfa 100644 --- a/modules/message_handler.py +++ b/modules/message_handler.py @@ -371,6 +371,7 @@ class MessageHandler: # If we have RF data with routing information, update the path with that instead if recent_rf_data and recent_rf_data.get('routing_info'): rf_routing = recent_rf_data['routing_info'] + message.routing_info = rf_routing # Path command uses this for multi-byte path (no re-parse) if rf_routing.get('path_length', 0) > 0: path_nodes = rf_routing.get('path_nodes', []) route_type = rf_routing.get('route_type', 'Unknown') @@ -763,13 +764,39 @@ class MessageHandler: 'payload_type': decoded_packet.get('payload_type_name', 'Unknown'), 'packet_hash': packet_hash # Store hash for packet tracking } - + # Validate path consistency (path_byte_length, path_hex, path_nodes, bytes_per_hop) + path_len = routing_info['path_length'] + path_byte_len = routing_info.get('path_byte_length') + path_hex_str = routing_info.get('path_hex', '') + path_nodes_list = routing_info.get('path_nodes') or [] + bph = routing_info.get('bytes_per_hop', 1) or 1 + expected_hex_len = (path_byte_len * 2) if path_byte_len is not None else (path_len * bph * 2) + if path_len > 0 and path_hex_str: + if len(path_hex_str) != expected_hex_len: + self.logger.warning( + "Path length mismatch: path_hex has %d hex chars, expected %d (path_byte_length=%s, path_length=%s, bytes_per_hop=%s)", + len(path_hex_str), expected_hex_len, path_byte_len, path_len, bph + ) + if path_nodes_list and len(path_nodes_list) != path_len: + self.logger.warning( + "Path nodes count mismatch: %d nodes, path_length=%d", + len(path_nodes_list), path_len + ) + if path_nodes_list and bph >= 1 and any(len(str(n)) != bph * 2 for n in path_nodes_list): + self.logger.warning( + "Path node width mismatch: bytes_per_hop=%d expects %d hex chars per node, nodes=%s", + bph, bph * 2, path_nodes_list[:5] + ) # Log the routing information for analysis if routing_info['path_length'] > 0: - # Format path with configured node length (match decode layer) - path_hex = routing_info['path_hex'] - path_nodes_fmt = self._path_hex_to_nodes(path_hex) - formatted_path = ','.join(path_nodes_fmt) + # Use path_nodes when present (multi-byte); else chunk path_hex + path_nodes_list = routing_info.get('path_nodes') or [] + if path_nodes_list: + formatted_path = ','.join(str(n).lower() for n in path_nodes_list) + else: + path_hex = routing_info['path_hex'] + path_nodes_fmt = self._path_hex_to_nodes(path_hex) + formatted_path = ','.join(path_nodes_fmt) path_bytes_str = decoded_packet.get('path_byte_length', routing_info['path_length']) log_message = f"🛣️ ROUTING INFO: {routing_info['route_type']} | Path: {formatted_path} ({routing_info['path_length']} hops, {path_bytes_str} bytes) | Payload: {routing_info['payload_length']} bytes | Type: {routing_info['payload_type']}" self.logger.info(log_message) @@ -1375,6 +1402,55 @@ class MessageHandler: nodes = [path_hex[i:i + 2].lower() for i in range(0, len(path_hex), 2)] return nodes + def _get_path_from_rf_data( + self, + rf_data: Dict[str, Any], + payload_hex: Optional[str] = None, + packet_info: Optional[Dict[str, Any]] = None + ) -> Tuple[Optional[str], Optional[List[str]], int]: + """Get path string, path nodes, and hop count from RF data (single source for path extraction). + + Prefers routing_info.path_nodes when present (no re-decode; correct multi-byte). + Otherwise decodes (or uses provided packet_info) and gets path from decoder's 'path' + or chunks path_hex using bytes_per_hop from the packet. + + Returns: + (path_string, path_nodes, hops). path_nodes is a list for mesh graph; hops is path_length or 255. + """ + routing_info = rf_data.get('routing_info') or {} + path_nodes_list = routing_info.get('path_nodes') + if path_nodes_list: + path_str = ','.join(str(n).lower() for n in path_nodes_list) + return (path_str, list(path_nodes_list), len(path_nodes_list)) + raw_hex = rf_data.get('raw_hex') + if not raw_hex: + return (None, None, 255) + if packet_info is None: + payload = payload_hex or rf_data.get('payload') + packet_info = self.decode_meshcore_packet(raw_hex, payload) + if not packet_info: + return (None, None, 255) + hops = packet_info.get('path_len', 255) + path_nodes_list = packet_info.get('path_nodes') or packet_info.get('path') or [] + if path_nodes_list: + path_str = ','.join(str(n).lower() for n in path_nodes_list) + return (path_str, list(path_nodes_list), len(path_nodes_list)) + path_hex = packet_info.get('path_hex', '') + if path_hex and len(path_hex) >= 2: + bytes_per_hop = packet_info.get('bytes_per_hop', 1) + n = (bytes_per_hop * 2) if bytes_per_hop and bytes_per_hop >= 1 else 2 + path_nodes_list = [path_hex[i:i + n].lower() for i in range(0, len(path_hex), n)] + if (len(path_hex) % n) != 0: + path_nodes_list = [path_hex[i:i + 2].lower() for i in range(0, len(path_hex), 2)] + if path_nodes_list: + return (','.join(path_nodes_list), path_nodes_list, len(path_nodes_list)) + path_info = packet_info.get('path_info') or {} + path_nodes_list = path_info.get('path') or [] + if path_nodes_list: + path_str = ','.join(str(n).lower() for n in path_nodes_list) + return (path_str, list(path_nodes_list), len(path_nodes_list)) + return (None, None, hops) + def _process_packet_path(self, path_bytes: bytes, payload: bytes, route_type: RouteType, payload_type: PayloadType) -> dict: """ @@ -1637,96 +1713,45 @@ class MessageHandler: rssi = recent_rf_data['rssi'] self.logger.debug(f"Using RSSI from RF data: {rssi}") - # Try to extract path information from raw hex directly + # Single path source: prefer routing_info, else decode/fallback via helper path_string = None hops = payload.get('path_len', 255) - - # First try the packet decoder - # Use payload field if available, otherwise use raw_hex payload_hex = recent_rf_data.get('payload') packet_info = self.decode_meshcore_packet(raw_hex, payload_hex) - - # Get packet_hash from recent_rf_data if available (for trace correlation) packet_hash = recent_rf_data.get('packet_hash') if packet_hash and packet_info: packet_info['packet_hash'] = packet_hash - if packet_info and packet_info.get('path_len') is not None: - # Valid packet decoded - use the results even if path is empty (0 hops = direct) hops = packet_info.get('path_len', 0) - - # Check if this is a TRACE packet with SNR data if packet_info.get('payload_type') == 9: # TRACE packet - # For TRACE packets, extract routing path from payload pathHashes - # The path field contains SNR data, but the actual routing path is in payload path_info = packet_info.get('path_info', {}) path_hashes = path_info.get('path_hashes') or path_info.get('path', []) - if path_hashes: - # Convert pathHashes to path string path_string = ','.join(path_hashes) - self.logger.info(f"🎯 EXTRACTED PATH FROM TRACE PACKET: {path_string} ({len(path_hashes)} hops)") - - # Update mesh graph with trace path - bot is the destination, so we can confirm these edges - # Since the bot received this trace packet, it's the destination node + self.logger.debug(f"Path from TRACE packet: {path_string} ({len(path_hashes)} hops)") if hasattr(self.bot, 'mesh_graph') and self.bot.mesh_graph and self.bot.mesh_graph.capture_enabled: self._update_mesh_graph_from_trace(path_hashes, packet_info) else: path_string = "Direct" if hops == 0 else f"Unknown routing ({hops} hops)" - self.logger.info(f"🎯 EXTRACTED PATH FROM TRACE PACKET: {path_string}") + self.logger.debug(f"Path from TRACE packet: {path_string}") else: - # For all other packet types, try multiple methods to get the path - path_string = None - - # Method 1: Try path_nodes field first - path_nodes = packet_info.get('path_nodes', []) - if path_nodes: - path_string = ','.join(path_nodes) - self.logger.info(f"🎯 EXTRACTED PATH FROM PACKET: {path_string} ({hops} hops)") - # Update mesh graph with path edges - if hasattr(self.bot, 'mesh_graph') and self.bot.mesh_graph and self.bot.mesh_graph.capture_enabled: - self._update_mesh_graph(path_nodes, packet_info) - else: - # Method 2: Try path_hex field - path_hex = packet_info.get('path_hex', '') - if path_hex and len(path_hex) >= 2: - path_nodes = self._path_hex_to_nodes(path_hex) - path_string = ','.join(path_nodes) - self.logger.info(f"🎯 EXTRACTED PATH FROM PACKET HEX: {path_string} ({hops} hops)") - # Update mesh graph with path edges - if hasattr(self.bot, 'mesh_graph') and self.bot.mesh_graph and self.bot.mesh_graph.capture_enabled: - self._update_mesh_graph(path_nodes, packet_info) - else: - # Method 3: Try path_info.path field - path_info = packet_info.get('path_info', {}) - if path_info and path_info.get('path'): - path_nodes = path_info['path'] - path_string = ','.join(path_nodes) - self.logger.info(f"🎯 EXTRACTED PATH FROM PATH_INFO: {path_string} ({hops} hops)") - # Update mesh graph with path edges - if hasattr(self.bot, 'mesh_graph') and self.bot.mesh_graph and self.bot.mesh_graph.capture_enabled: - self._update_mesh_graph(path_nodes, packet_info) - else: - # No path found - this is truly unknown - path_string = "Direct" if hops == 0 else "Unknown routing" - self.logger.info(f"🎯 EXTRACTED PATH FROM PACKET: {path_string} ({hops} hops)") + had_routing_nodes = bool((recent_rf_data.get('routing_info') or {}).get('path_nodes')) + path_string, path_nodes, hops = self._get_path_from_rf_data( + recent_rf_data, payload_hex=payload_hex, packet_info=packet_info + ) + if path_string and path_nodes and hasattr(self.bot, 'mesh_graph') and self.bot.mesh_graph and self.bot.mesh_graph.capture_enabled: + self._update_mesh_graph(path_nodes, packet_info) + if path_string and not had_routing_nodes: + self.logger.debug(f"Path from fallback decode: {path_string} ({hops} hops)") else: - # Packet decoding failed - try to extract path directly from raw hex - self.logger.debug("Packet decoding failed, trying direct hex parsing") + self.logger.debug("Packet decoding failed, trying direct hex or routing_info fallback") path_string = self.extract_path_from_raw_hex(raw_hex, hops) - if path_string: - self.logger.info(f"🎯 EXTRACTED PATH FROM RAW HEX: {path_string} ({hops} hops)") - else: - # Try to use routing info from RF data as fallback - if recent_rf_data.get('routing_info') and recent_rf_data['routing_info'].get('path_nodes'): - routing_info = recent_rf_data['routing_info'] - hops = len(routing_info['path_nodes']) - path_string = ','.join(routing_info['path_nodes']) - self.logger.info(f"🎯 EXTRACTED PATH FROM RF ROUTING INFO: {path_string} ({hops} hops)") - else: - # Final fallback to basic path info - self.logger.debug("No path info available, using basic path info") - path_string = None + if not path_string and recent_rf_data.get('routing_info') and recent_rf_data['routing_info'].get('path_nodes'): + routing_info = recent_rf_data['routing_info'] + path_nodes = routing_info['path_nodes'] + hops = len(path_nodes) + path_string = ','.join(str(n).lower() for n in path_nodes) + self.logger.debug(f"Path from RF routing_info fallback: {path_string} ({hops} hops)") else: self.logger.warning("❌ NO RF DATA found for channel message after all correlation attempts") hops = payload.get('path_len', 255) @@ -1760,6 +1785,8 @@ class MessageHandler: elapsed=_elapsed, is_dm=False ) + if recent_rf_data and recent_rf_data.get('routing_info'): + message.routing_info = recent_rf_data['routing_info'] # Path information is now set directly in the MeshMessage constructor from RF data # No need for additional path extraction since we're using the actual routing data diff --git a/modules/models.py b/modules/models.py index 740c2e4..c3bbb74 100644 --- a/modules/models.py +++ b/modules/models.py @@ -5,7 +5,7 @@ Contains shared data structures used across modules """ from dataclasses import dataclass -from typing import Optional +from typing import Any, Dict, Optional @dataclass @@ -22,3 +22,5 @@ class MeshMessage: snr: Optional[float] = None rssi: Optional[int] = None elapsed: Optional[str] = None + # When set from RF routing: path_nodes, path_hex, bytes_per_hop, path_length, route_type, etc. + routing_info: Optional[Dict[str, Any]] = None diff --git a/pyproject.toml b/pyproject.toml index 75af6b3..d7b7e9e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,7 @@ dependencies = [ "maidenhead>=1.4.0", "pytz>=2023.3", "aiohttp>=3.8.0", - "meshcore>=2.1.6", + "meshcore>=2.2.14", "openmeteo-requests>=1.7.2", "requests-cache>=1.1.1", "retry-requests>=1.0.0", diff --git a/requirements.txt b/requirements.txt index d4be2dd..9a1c109 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,7 +18,7 @@ pytz>=2023.3 aiohttp>=3.8.0 cryptography>=41.0.0 pynacl>=1.5.0 -meshcore>=2.1.6 +meshcore>=2.2.14 openmeteo-requests>=1.7.2 requests-cache>=1.1.1 retry-requests>=1.0.0 diff --git a/tests/unit/test_path_command_multibyte.py b/tests/unit/test_path_command_multibyte.py new file mode 100644 index 0000000..b530b09 --- /dev/null +++ b/tests/unit/test_path_command_multibyte.py @@ -0,0 +1,146 @@ +#!/usr/bin/env python3 +""" +Unit tests for PathCommand multi-byte path support: routing_info usage and comma/prefix parsing. +""" + +import pytest +from unittest.mock import Mock +from modules.commands.path_command import PathCommand +from modules.models import MeshMessage + + +@pytest.mark.unit +class TestPathCommandDecodePathMultibyte: + """Test _decode_path parsing: comma-separated inference and parse_path_string fallback.""" + + @pytest.fixture + def path_command(self, mock_bot): + """Create a PathCommand instance.""" + return PathCommand(mock_bot) + + @pytest.mark.asyncio + async def test_comma_separated_two_byte_infers_four_char_nodes(self, path_command, mock_bot): + """path 0102,5f7e -> 2 nodes (0102, 5F7E) even when prefix_hex_chars=2 would give 4 nodes for continuous.""" + mock_bot.prefix_hex_chars = 2 + captured = [] + + async def capture_lookup(node_ids, lookup_func=None): + captured.append(node_ids) + return {nid: {'found': True, 'name': nid} for nid in node_ids} + + path_command._lookup_repeater_names = capture_lookup + await path_command._decode_path("0102,5f7e") + assert len(captured) == 1 + assert captured[0] == ['0102', '5F7E'] + + @pytest.mark.asyncio + async def test_comma_separated_one_byte_two_nodes(self, path_command, mock_bot): + """path 01,5f -> 2 nodes (01, 5F).""" + mock_bot.prefix_hex_chars = 2 + captured = [] + + async def capture_lookup(node_ids, lookup_func=None): + captured.append(node_ids) + return {nid: {'found': True, 'name': nid} for nid in node_ids} + + path_command._lookup_repeater_names = capture_lookup + await path_command._decode_path("01,5f") + assert len(captured) == 1 + assert captured[0] == ['01', '5F'] + + @pytest.mark.asyncio + async def test_continuous_hex_uses_prefix_hex_chars(self, path_command, mock_bot): + """path 01025f7e: prefix_hex_chars=2 -> 4 nodes (1-byte); prefix_hex_chars=4 -> 2 nodes (2-byte).""" + captured = [] + + async def capture_lookup(node_ids, lookup_func=None): + captured.append(list(node_ids)) + return {nid: {'found': True, 'name': nid} for nid in node_ids} + + path_command._lookup_repeater_names = capture_lookup + + mock_bot.prefix_hex_chars = 2 # 2 hex chars per node = 1 byte per hop + captured.clear() + await path_command._decode_path("01025f7e") + assert len(captured) == 1 + assert captured[0] == ['01', '02', '5F', '7E'] + + mock_bot.prefix_hex_chars = 4 # 4 hex chars per node = 2 bytes per hop + captured.clear() + await path_command._decode_path("01025f7e") + assert len(captured) == 1 + assert captured[0] == ['0102', '5F7E'] + + @pytest.mark.asyncio + async def test_strips_hop_count_suffix(self, path_command, mock_bot): + """path 01,5f (2 hops) -> 2 nodes.""" + mock_bot.prefix_hex_chars = 2 + captured = [] + + async def capture_lookup(node_ids, lookup_func=None): + captured.append(node_ids) + return {nid: {'found': True, 'name': nid} for nid in node_ids} + + path_command._lookup_repeater_names = capture_lookup + await path_command._decode_path("01,5f (2 hops)") + assert len(captured) == 1 + assert captured[0] == ['01', '5F'] + + +@pytest.mark.unit +class TestPathCommandExtractPathUsesRoutingInfo: + """Test _extract_path_from_recent_messages prefers routing_info.path_nodes when present.""" + + @pytest.fixture + def path_command(self, mock_bot): + """Create a PathCommand instance.""" + return PathCommand(mock_bot) + + @pytest.mark.asyncio + async def test_uses_routing_info_path_nodes_when_present(self, path_command, mock_bot): + """When message has routing_info with path_nodes, use them directly (no _decode_path).""" + path_command._current_message = MeshMessage( + content="path", + path="0102,5f7e (2 hops via FLOOD)", + routing_info={ + 'path_length': 2, + 'path_nodes': ['0102', '5f7e'], + 'path_hex': '01025f7e', + 'bytes_per_hop': 2, + 'route_type': 'FLOOD', + }, + ) + captured = [] + + async def capture_lookup(node_ids, lookup_func=None): + captured.append(list(node_ids)) + return {nid: {'found': True, 'name': nid} for nid in node_ids} + + path_command._lookup_repeater_names = capture_lookup + decode_path_called = [] + + async def track_decode(path_input): + decode_path_called.append(path_input) + return "decode_path_result" + + path_command._decode_path = track_decode + path_command.translate = lambda key, **kwargs: f"msg:{kwargs.get('node_id', kwargs.get('name', key))}" + + result = await path_command._extract_path_from_recent_messages() + + assert len(captured) == 1 + assert captured[0] == ['0102', '5F7E'] + assert len(decode_path_called) == 0, "Should not call _decode_path when routing_info.path_nodes present" + assert '0102' in result and '5F7E' in result + + @pytest.mark.asyncio + async def test_direct_connection_when_routing_info_path_length_zero(self, path_command, mock_bot): + """When routing_info.path_length is 0, return direct connection message.""" + path_command._current_message = MeshMessage( + content="path", + path="Direct via FLOOD", + routing_info={'path_length': 0, 'path_nodes': [], 'route_type': 'FLOOD'}, + ) + path_command.translate = lambda key, **kwargs: "Direct connection" if "direct" in key.lower() else key + result = await path_command._extract_path_from_recent_messages() + assert "direct" in result.lower()