diff --git a/modules/commands/multitest_command.py b/modules/commands/multitest_command.py index f283cce..76f53fb 100644 --- a/modules/commands/multitest_command.py +++ b/modules/commands/multitest_command.py @@ -14,7 +14,8 @@ from typing import Literal, Optional CondensePathsMode = Literal["off", "flat", "nested"] from shared.models import MeshMessage -from ..utils import calculate_packet_hash, parse_path_string +from shared.parsers.packet_parser import calculate_packet_hash +from shared.parsers.path_parser import parse_path_string from .base_command import BaseCommand _BRANCH_INTER = "\u251c" # ├ (intermediate branch) diff --git a/modules/commands/path_command.py b/modules/commands/path_command.py index d85117d..5ae6a42 100644 --- a/modules/commands/path_command.py +++ b/modules/commands/path_command.py @@ -13,7 +13,7 @@ from typing import Any, Callable, Optional from shared.models import MeshMessage from shared.security_utils import sanitize_name from shared.geocoding import calculate_distance -from ..utils import bytes_per_hop_from_routing_and_nodes, parse_path_string +from shared.parsers.path_parser import bytes_per_hop_from_routing_and_nodes, parse_path_string from .base_command import BaseCommand diff --git a/modules/message_handler.py b/modules/message_handler.py index abe167b..4536b93 100644 --- a/modules/message_handler.py +++ b/modules/message_handler.py @@ -15,13 +15,10 @@ from typing import Any, TypedDict from .enums import AdvertFlags, DeviceRole, PayloadType, PayloadVersion, RouteType from .graph_trace_helper import update_mesh_graph_from_trace_data from shared.models import MeshMessage +from shared.parsers.packet_parser import calculate_packet_hash +from shared.parsers.path_parser import decode_path_len_byte, encode_path_len_byte from shared.security_utils import sanitize_input, sanitize_name from shared.text_utils import format_elapsed_display -from .utils import ( - calculate_packet_hash, - decode_path_len_byte, - encode_path_len_byte, -) class PendingMessageEntry(TypedDict): diff --git a/modules/response_template.py b/modules/response_template.py index 531703d..5bc3b3c 100644 --- a/modules/response_template.py +++ b/modules/response_template.py @@ -10,7 +10,7 @@ from __future__ import annotations import re from typing import Any, Callable -from .utils import message_path_bytes_per_hop +from shared.parsers.path_parser import message_path_bytes_per_hop FilterFn = Callable[[str, dict[str, Any], str], str] diff --git a/modules/service_plugins/map_uploader_service.py b/modules/service_plugins/map_uploader_service.py index d16bbaf..2b96597 100644 --- a/modules/service_plugins/map_uploader_service.py +++ b/modules/service_plugins/map_uploader_service.py @@ -37,7 +37,8 @@ except ImportError: # Import private key utilities # Import utilities -from ..utils import decode_path_len_byte, resolve_path +from shared.parsers.path_parser import decode_path_len_byte +from ..utils import resolve_path # Import base service from .base_service import BaseServicePlugin diff --git a/modules/service_plugins/packet_capture_service.py b/modules/service_plugins/packet_capture_service.py index 24bfc9a..4ee361f 100644 --- a/modules/service_plugins/packet_capture_service.py +++ b/modules/service_plugins/packet_capture_service.py @@ -21,12 +21,8 @@ from meshcore import EventType from ..enums import PayloadType, PayloadVersion, RouteType # Import bot's utilities for packet hash -from ..utils import( - calculate_packet_hash, - decode_path_len_byte, - parse_trace_payload_route_hashes, - verify_meshcore_advert_ed25519, -) +from shared.parsers.packet_parser import calculate_packet_hash, verify_meshcore_advert_ed25519 +from shared.parsers.path_parser import decode_path_len_byte, parse_trace_payload_route_hashes from ..version_info import resolve_runtime_version # Import MQTT client diff --git a/modules/utils.py b/modules/utils.py index e471100..6ababce 100644 --- a/modules/utils.py +++ b/modules/utils.py @@ -5,7 +5,6 @@ Shared helper functions used across multiple modules """ import asyncio -import hashlib import re import socket import urllib.error @@ -15,6 +14,19 @@ from pathlib import Path from typing import Any, Optional, Union from shared.geocoding import calculate_distance +from shared.parsers.packet_parser import calculate_packet_hash, verify_meshcore_advert_ed25519 +from shared.parsers.path_parser import ( + _HEX_BYTE_TOKEN, + _normalized_message_path_string, + bytes_per_hop_from_routing_and_nodes, + decode_path_len_byte, + encode_path_len_byte, + extract_path_node_ids_from_message, + message_path_bytes_per_hop, + node_ids_from_path_string, + parse_path_string, + parse_trace_payload_route_hashes, +) from shared.text_utils import format_elapsed_display try: @@ -131,178 +143,6 @@ def format_temperature_high_low( -def decode_path_len_byte(path_len_byte: int, max_path_size: int = 64) -> tuple[int, int] | None: - """Decode the RF packet path_len byte per firmware ``Packet::isValidPathLen``. - - Encoding: low 6 bits = hop count, high 2 bits = size code. - ``bytes_per_hop = (path_len >> 6) + 1`` → 1, 2, 3, or 4 (4 is reserved and invalid). - - Args: - path_len_byte: The single path_len byte from the packet. - max_path_size: Max path bytes (default 64, matches ``MAX_PATH_SIZE``). - - Returns: - ``(path_byte_length, bytes_per_hop)`` if the encoding is valid on the wire. - ``None`` if reserved size class (4) or ``hop_count * bytes_per_hop > max_path_size`` - — matching MeshCore where ``readFrom`` rejects the packet (no legacy reinterpretation). - """ - hop_count = path_len_byte & 63 - size_code = path_len_byte >> 6 - bytes_per_hop = size_code + 1 # 1, 2, 3, or 4 - if bytes_per_hop == 4: - return None - path_byte_length = hop_count * bytes_per_hop - if path_byte_length > max_path_size: - return None - return (path_byte_length, bytes_per_hop) - - -def parse_trace_payload_route_hashes(payload: bytes) -> list[str]: - """Extract TRACE route hash segments from mesh payload (after tag, auth, flags). - - Matches MeshCore: ``bytes_per_hash = 1 << (flags & 3)`` for bytes at ``payload[9:]``. - If the tail length is not a multiple of ``bytes_per_hash``, falls back to 1-byte - segments (same as MessageHandler._process_packet_path). - - Args: - payload: Full mesh payload bytes (not including header/path). - - Returns: - List of uppercase hex strings, one per hop hash. - """ - if len(payload) < 9: - return [] - flags = payload[8] - path_hash_len = 1 << (flags & 3) - if path_hash_len <= 0: - path_hash_len = 1 - path_hashes_bytes = payload[9:] - if not path_hashes_bytes: - return [] - try: - if len(path_hashes_bytes) % path_hash_len == 0: - return [ - path_hashes_bytes[i : i + path_hash_len].hex().upper() - for i in range(0, len(path_hashes_bytes), path_hash_len) - ] - except Exception: - pass - return [f"{b:02X}" for b in path_hashes_bytes] - - -def encode_path_len_byte(hop_count: int, bytes_per_hop: int) -> int: - """Pack hop count and hash size into the single path_len wire byte (inverse of decode_path_len_byte). - - Firmware: low 6 bits = hop count, high 2 bits = size code with bytes_per_hop = (code + 1). - Valid bytes_per_hop are 1, 2, or 3 (size code 4 is reserved). - """ - if bytes_per_hop not in (1, 2, 3): - raise ValueError(f"bytes_per_hop must be 1, 2, or 3, got {bytes_per_hop}") - hop_count = int(hop_count) & 0x3F - size_code = (int(bytes_per_hop) - 1) & 0x03 - return (size_code << 6) | hop_count - - -def calculate_packet_hash(raw_hex: str, payload_type: Optional[int] = None) -> str: - """Calculate hash for packet identification - based on packet.cpp. - - Packet hashes are unique to the originally sent message, allowing - identification of the same message arriving via different paths. - - Args: - raw_hex: Raw packet data as hex string. - payload_type: Optional payload type as integer (if None, extracted from header). - Must be numeric value (0-15). - - Returns: - str: 16-character hex string (8 bytes) in uppercase, or "0000000000000000" on error. - """ - try: - # Parse the packet to extract payload type and payload data - byte_data = bytes.fromhex(raw_hex) - header = byte_data[0] - - # Get payload type from header (bits 2-5) - if payload_type is None: - payload_type = (header >> 2) & 0x0F - else: - # Ensure payload_type is an integer (handle enum.value if passed) - if hasattr(payload_type, 'value'): - payload_type = payload_type.value - payload_type = int(payload_type) & 0x0F # Ensure it's 0-15 - - # Check if transport codes are present - route_type = header & 0x03 - has_transport = route_type in [0x00, 0x03] # TRANSPORT_FLOOD or TRANSPORT_DIRECT - - # Calculate path length offset dynamically based on transport codes - offset = 1 # After header - if has_transport: - offset += 4 # Skip 4 bytes of transport codes - - # Validate we have enough bytes for path_len - if len(byte_data) <= offset: - return "0000000000000000" - - path_len_byte = byte_data[offset] - offset += 1 - path_parts = decode_path_len_byte(path_len_byte) - if path_parts is None: - return "0000000000000000" - path_byte_length, _ = path_parts - - # Validate we have enough bytes for the path - if len(byte_data) < offset + path_byte_length: - return "0000000000000000" - - # Skip past the path to get to payload - payload_start = offset + path_byte_length - - # Validate we have payload data - if len(byte_data) <= payload_start: - return "0000000000000000" - - payload_data = byte_data[payload_start:] - - # Calculate hash exactly like MeshCore Packet::calculatePacketHash(): - # 1. Payload type (1 byte) - # 2. Path length (2 bytes as uint16_t, little-endian) - ONLY for TRACE packets (type 9) - # 3. Payload data - hash_obj = hashlib.sha256() - hash_obj.update(bytes([payload_type])) - - if payload_type == 9: # PAYLOAD_TYPE_TRACE - # C++ does: sha.update(&path_len, sizeof(path_len)) - # path_len is the raw wire byte (uint16_t in firmware), not the decoded byte count - hash_obj.update(path_len_byte.to_bytes(2, byteorder='little')) - - hash_obj.update(payload_data) - - # Return first 16 hex characters (8 bytes) in uppercase - return hash_obj.hexdigest()[:16].upper() - except Exception: - # Return default hash on error (caller should handle logging) - return "0000000000000000" - - -def verify_meshcore_advert_ed25519(mesh_payload: bytes) -> bool: - """Verify MeshCore ADVERT Ed25519 signature (layout from ``Mesh::createAdvert``). - - Signed message is ``pub_key (32) + timestamp (4, LE) + app_data``; signature is - ``payload[36:100]`` (64 bytes); ``app_data`` starts at byte 100. - """ - if len(mesh_payload) < 100: - return False - try: - from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey - - pub = mesh_payload[:32] - msg = mesh_payload[:36] + mesh_payload[100:] - sig = mesh_payload[36:100] - Ed25519PublicKey.from_public_bytes(pub).verify(sig, msg) - return True - except Exception: - return False @@ -458,153 +298,6 @@ async def check_internet_connectivity_async(host: str = "8.8.8.8", port: int = 5 return False -def parse_path_string(path_str: str, prefix_hex_chars: int = 2) -> list[str]: - """Parse a path string to extract node IDs. - - Handles various formats: - - "11,98,a4,49,cd,5f,01" (comma-separated) - - "11 98 a4 49 cd 5f 01" (space-separated) - - "1198a449cd5f01" (continuous hex) - - "01,5f (2 hops)" (with hop count suffix) - - Args: - path_str: Path string in various formats. - prefix_hex_chars: Number of hex characters per node (2 = 1 byte, 4 = 2 bytes). Default 2. - - Returns: - List[str]: List of uppercase hex node IDs (each of length prefix_hex_chars). - """ - if not path_str: - return [] - - # Remove hop count suffix if present (e.g., " (2 hops)") - path_str = re.sub(r'\s*\([^)]*hops?[^)]*\)', '', path_str, flags=re.IGNORECASE) - path_str = path_str.strip() - - # Replace common separators with spaces - path_str = path_str.replace(',', ' ').replace(':', ' ') - - # Extract hex values using regex (prefix_hex_chars-wide hex tokens) - hex_pattern = rf'[0-9a-fA-F]{{{prefix_hex_chars}}}' - hex_matches = re.findall(hex_pattern, path_str) - - # Legacy fallback: if configured length > 2 and no matches, retry with 2-char (1-byte) nodes - if not hex_matches and prefix_hex_chars > 2: - legacy_pattern = r'[0-9a-fA-F]{2}' - hex_matches = re.findall(legacy_pattern, path_str) - - # Convert to uppercase for consistency - return [match.upper() for match in hex_matches] - - -_HEX_BYTE_TOKEN = frozenset('0123456789aAbBcCdDeEfF') - - -def extract_path_node_ids_from_message(message: Any) -> list[str]: - """Extract path node IDs from a mesh message (MeshCore multi-byte paths). - - Prefers ``routing_info.path_nodes``; else parses comma-separated hop tokens - (2, 4, or 6 hex chars each) from ``message.path``. Matches TestCommand logic. - - Returns: - List of node IDs (uppercase hex). Empty when direct / unparseable. - """ - routing_info = getattr(message, 'routing_info', None) - if routing_info is not None and routing_info.get('path_length', 0) == 0: - return [] - if routing_info and routing_info.get('path_nodes'): - return [str(n).upper().strip() for n in routing_info['path_nodes']] - path_string = getattr(message, 'path', None) or '' - if not path_string or "Direct" in path_string or "0 hops" in path_string: - return [] - if " via ROUTE_TYPE_" in path_string: - path_string = path_string.split(" via ROUTE_TYPE_")[0] - if '(' in path_string: - path_string = path_string.split('(')[0].strip() - if ',' in path_string: - parts = [p.strip() for p in path_string.split(',') if p.strip()] - if parts and all( - len(p) in (2, 4, 6) and all(c in _HEX_BYTE_TOKEN for c in p) - for p in parts - ): - return [p.upper() for p in parts] - return [] - - -def _normalized_message_path_string(message: Any) -> str: - """Strip route suffix and hop-count suffix from message.path for continuous-hex parsing.""" - path_string = (getattr(message, 'path', None) or '').strip() - if not path_string or 'Direct' in path_string or '0 hops' in path_string: - return '' - if ' via ROUTE_TYPE_' in path_string: - path_string = path_string.split(' via ROUTE_TYPE_')[0] - path_string = re.sub(r'\s*\([^)]*hops?[^)]*\)', '', path_string, flags=re.IGNORECASE).strip() - return path_string - - -def bytes_per_hop_from_routing_and_nodes( - routing_info: Optional[dict[str, Any]], - node_ids: list[str], -) -> int: - """Bytes per hop from packet routing metadata, else inferred from hex node width. - - When ``routing_info`` includes ``bytes_per_hop`` in 1..3, that value wins. - Otherwise uses minimum half-byte width among ``node_ids`` (comma or path_nodes). - Returns ``1`` when no nodes (direct / unknown). - """ - if routing_info: - bph = routing_info.get('bytes_per_hop') - if isinstance(bph, int) and 1 <= bph <= 3: - return bph - if node_ids: - return min(len(n) // 2 for n in node_ids) - return 1 - - -def message_path_bytes_per_hop(message: Any, *, prefix_hex_chars: int = 2) -> int: - """Best-effort bytes per hop for the message path (RF metadata or inferred from path text). - - Uses ``routing_info.bytes_per_hop`` when present (1..3). Otherwise prefers - :func:`extract_path_node_ids_from_message`, then comma/continuous hex via - :func:`node_ids_from_path_string` using ``prefix_hex_chars`` for legacy paths. - - Returns ``1`` when no usable path (direct / unparseable) so conservative gates - (e.g. ``pathbytes_min:2``) do not treat unknown as multibyte. - """ - routing_info = getattr(message, 'routing_info', None) - node_ids = extract_path_node_ids_from_message(message) - if not node_ids: - ps = _normalized_message_path_string(message) - if ps: - node_ids = node_ids_from_path_string(ps, prefix_hex_chars) - return bytes_per_hop_from_routing_and_nodes(routing_info, node_ids) - - -def node_ids_from_path_string(path_str: str, prefix_hex_chars: int = 2) -> list[str]: - """Parse path display string into node IDs: multi-byte comma tokens, else fixed-width scan. - - Comma-separated tokens must each be 2, 4, or 6 hex digits (one hop per token). - Otherwise falls back to :func:`parse_path_string` (legacy continuous / 1-byte paths). - """ - if not path_str or not path_str.strip(): - return [] - path_lower = path_str.lower() - if "direct" in path_lower or "0 hops" in path_lower: - return [] - s = path_str.strip() - if " via ROUTE_TYPE_" in s: - s = s.split(" via ROUTE_TYPE_")[0].strip() - s = re.sub(r'\s*\([^)]*hops?[^)]*\)', '', s, flags=re.IGNORECASE).strip() - if not s: - return [] - if ',' in s: - parts = [p.strip() for p in s.split(',') if p.strip()] - if parts and all( - len(p) in (2, 4, 6) and all(c in _HEX_BYTE_TOKEN for c in p) - for p in parts - ): - return [p.upper() for p in parts] - return parse_path_string(s, prefix_hex_chars) def calculate_path_distances( diff --git a/shared/parsers/packet_parser.py b/shared/parsers/packet_parser.py new file mode 100644 index 0000000..0ae0281 --- /dev/null +++ b/shared/parsers/packet_parser.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python3 +""" +Mesh packet hash and advertisement signature utilities. + +Used by both the bot (modules/) and the web viewer (web_viewer/). +""" + +import hashlib +from typing import Optional + +from shared.parsers.path_parser import decode_path_len_byte + + +def calculate_packet_hash(raw_hex: str, payload_type: Optional[int] = None) -> str: + """Calculate hash for packet identification - based on packet.cpp. + + Packet hashes are unique to the originally sent message, allowing + identification of the same message arriving via different paths. + + Args: + raw_hex: Raw packet data as hex string. + payload_type: Optional payload type as integer (if None, extracted from header). + Must be numeric value (0-15). + + Returns: + str: 16-character hex string (8 bytes) in uppercase, or "0000000000000000" on error. + """ + try: + # Parse the packet to extract payload type and payload data + byte_data = bytes.fromhex(raw_hex) + header = byte_data[0] + + # Get payload type from header (bits 2-5) + if payload_type is None: + payload_type = (header >> 2) & 0x0F + else: + # Ensure payload_type is an integer (handle enum.value if passed) + if hasattr(payload_type, 'value'): + payload_type = payload_type.value + payload_type = int(payload_type) & 0x0F # Ensure it's 0-15 + + # Check if transport codes are present + route_type = header & 0x03 + has_transport = route_type in [0x00, 0x03] # TRANSPORT_FLOOD or TRANSPORT_DIRECT + + # Calculate path length offset dynamically based on transport codes + offset = 1 # After header + if has_transport: + offset += 4 # Skip 4 bytes of transport codes + + # Validate we have enough bytes for path_len + if len(byte_data) <= offset: + return "0000000000000000" + + path_len_byte = byte_data[offset] + offset += 1 + path_parts = decode_path_len_byte(path_len_byte) + if path_parts is None: + return "0000000000000000" + path_byte_length, _ = path_parts + + # Validate we have enough bytes for the path + if len(byte_data) < offset + path_byte_length: + return "0000000000000000" + + # Skip past the path to get to payload + payload_start = offset + path_byte_length + + # Validate we have payload data + if len(byte_data) <= payload_start: + return "0000000000000000" + + payload_data = byte_data[payload_start:] + + # Calculate hash exactly like MeshCore Packet::calculatePacketHash(): + # 1. Payload type (1 byte) + # 2. Path length (2 bytes as uint16_t, little-endian) - ONLY for TRACE packets (type 9) + # 3. Payload data + hash_obj = hashlib.sha256() + hash_obj.update(bytes([payload_type])) + + if payload_type == 9: # PAYLOAD_TYPE_TRACE + # C++ does: sha.update(&path_len, sizeof(path_len)) + # path_len is the raw wire byte (uint16_t in firmware), not the decoded byte count + hash_obj.update(path_len_byte.to_bytes(2, byteorder='little')) + + hash_obj.update(payload_data) + + # Return first 16 hex characters (8 bytes) in uppercase + return hash_obj.hexdigest()[:16].upper() + except Exception: + # Return default hash on error (caller should handle logging) + return "0000000000000000" + + +def verify_meshcore_advert_ed25519(mesh_payload: bytes) -> bool: + """Verify MeshCore ADVERT Ed25519 signature (layout from ``Mesh::createAdvert``). + + Signed message is ``pub_key (32) + timestamp (4, LE) + app_data``; signature is + ``payload[36:100]`` (64 bytes); ``app_data`` starts at byte 100. + """ + if len(mesh_payload) < 100: + return False + try: + from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey + + pub = mesh_payload[:32] + msg = mesh_payload[:36] + mesh_payload[100:] + sig = mesh_payload[36:100] + Ed25519PublicKey.from_public_bytes(pub).verify(sig, msg) + return True + except Exception: + return False diff --git a/shared/parsers/path_parser.py b/shared/parsers/path_parser.py new file mode 100644 index 0000000..44e0569 --- /dev/null +++ b/shared/parsers/path_parser.py @@ -0,0 +1,232 @@ +#!/usr/bin/env python3 +""" +Canonical path string / path-len byte parsers. + +Used by both the bot (modules/) and the web viewer (web_viewer/). +These replace three prior duplicated implementations in message_handler.py, +web_viewer/app.py, and utils.py. +""" + +import re +from typing import Any, Optional + + +def decode_path_len_byte(path_len_byte: int, max_path_size: int = 64) -> tuple[int, int] | None: + """Decode the RF packet path_len byte per firmware ``Packet::isValidPathLen``. + + Encoding: low 6 bits = hop count, high 2 bits = size code. + ``bytes_per_hop = (path_len >> 6) + 1`` → 1, 2, 3, or 4 (4 is reserved and invalid). + + Args: + path_len_byte: The single path_len byte from the packet. + max_path_size: Max path bytes (default 64, matches ``MAX_PATH_SIZE``). + + Returns: + ``(path_byte_length, bytes_per_hop)`` if the encoding is valid on the wire. + ``None`` if reserved size class (4) or ``hop_count * bytes_per_hop > max_path_size`` + — matching MeshCore where ``readFrom`` rejects the packet (no legacy reinterpretation). + """ + hop_count = path_len_byte & 63 + size_code = path_len_byte >> 6 + bytes_per_hop = size_code + 1 # 1, 2, 3, or 4 + if bytes_per_hop == 4: + return None + path_byte_length = hop_count * bytes_per_hop + if path_byte_length > max_path_size: + return None + return (path_byte_length, bytes_per_hop) + + +def parse_trace_payload_route_hashes(payload: bytes) -> list[str]: + """Extract TRACE route hash segments from mesh payload (after tag, auth, flags). + + Matches MeshCore: ``bytes_per_hash = 1 << (flags & 3)`` for bytes at ``payload[9:]``. + If the tail length is not a multiple of ``bytes_per_hash``, falls back to 1-byte + segments (same as MessageHandler._process_packet_path). + + Args: + payload: Full mesh payload bytes (not including header/path). + + Returns: + List of uppercase hex strings, one per hop hash. + """ + if len(payload) < 9: + return [] + flags = payload[8] + path_hash_len = 1 << (flags & 3) + if path_hash_len <= 0: + path_hash_len = 1 + path_hashes_bytes = payload[9:] + if not path_hashes_bytes: + return [] + try: + if len(path_hashes_bytes) % path_hash_len == 0: + return [ + path_hashes_bytes[i : i + path_hash_len].hex().upper() + for i in range(0, len(path_hashes_bytes), path_hash_len) + ] + except Exception: + pass + return [f"{b:02X}" for b in path_hashes_bytes] + + +def encode_path_len_byte(hop_count: int, bytes_per_hop: int) -> int: + """Pack hop count and hash size into the single path_len wire byte (inverse of decode_path_len_byte). + + Firmware: low 6 bits = hop count, high 2 bits = size code with bytes_per_hop = (code + 1). + Valid bytes_per_hop are 1, 2, or 3 (size code 4 is reserved). + """ + if bytes_per_hop not in (1, 2, 3): + raise ValueError(f"bytes_per_hop must be 1, 2, or 3, got {bytes_per_hop}") + hop_count = int(hop_count) & 0x3F + size_code = (int(bytes_per_hop) - 1) & 0x03 + return (size_code << 6) | hop_count + + +def parse_path_string(path_str: str, prefix_hex_chars: int = 2) -> list[str]: + """Parse a path string to extract node IDs. + + Handles various formats: + - "11,98,a4,49,cd,5f,01" (comma-separated) + - "11 98 a4 49 cd 5f 01" (space-separated) + - "1198a449cd5f01" (continuous hex) + - "01,5f (2 hops)" (with hop count suffix) + + Args: + path_str: Path string in various formats. + prefix_hex_chars: Number of hex characters per node (2 = 1 byte, 4 = 2 bytes). Default 2. + + Returns: + List[str]: List of uppercase hex node IDs (each of length prefix_hex_chars). + """ + if not path_str: + return [] + + # Remove hop count suffix if present (e.g., " (2 hops)") + path_str = re.sub(r'\s*\([^)]*hops?[^)]*\)', '', path_str, flags=re.IGNORECASE) + path_str = path_str.strip() + + # Replace common separators with spaces + path_str = path_str.replace(',', ' ').replace(':', ' ') + + # Extract hex values using regex (prefix_hex_chars-wide hex tokens) + hex_pattern = rf'[0-9a-fA-F]{{{prefix_hex_chars}}}' + hex_matches = re.findall(hex_pattern, path_str) + + # Legacy fallback: if configured length > 2 and no matches, retry with 2-char (1-byte) nodes + if not hex_matches and prefix_hex_chars > 2: + legacy_pattern = r'[0-9a-fA-F]{2}' + hex_matches = re.findall(legacy_pattern, path_str) + + # Convert to uppercase for consistency + return [match.upper() for match in hex_matches] + + +_HEX_BYTE_TOKEN = frozenset('0123456789aAbBcCdDeEfF') + + +def extract_path_node_ids_from_message(message: Any) -> list[str]: + """Extract path node IDs from a mesh message (MeshCore multi-byte paths). + + Prefers ``routing_info.path_nodes``; else parses comma-separated hop tokens + (2, 4, or 6 hex chars each) from ``message.path``. Matches TestCommand logic. + + Returns: + List of node IDs (uppercase hex). Empty when direct / unparseable. + """ + routing_info = getattr(message, 'routing_info', None) + if routing_info is not None and routing_info.get('path_length', 0) == 0: + return [] + if routing_info and routing_info.get('path_nodes'): + return [str(n).upper().strip() for n in routing_info['path_nodes']] + path_string = getattr(message, 'path', None) or '' + if not path_string or "Direct" in path_string or "0 hops" in path_string: + return [] + if " via ROUTE_TYPE_" in path_string: + path_string = path_string.split(" via ROUTE_TYPE_")[0] + if '(' in path_string: + path_string = path_string.split('(')[0].strip() + if ',' in path_string: + parts = [p.strip() for p in path_string.split(',') if p.strip()] + if parts and all( + len(p) in (2, 4, 6) and all(c in _HEX_BYTE_TOKEN for c in p) + for p in parts + ): + return [p.upper() for p in parts] + return [] + + +def _normalized_message_path_string(message: Any) -> str: + """Strip route suffix and hop-count suffix from message.path for continuous-hex parsing.""" + path_string = (getattr(message, 'path', None) or '').strip() + if not path_string or 'Direct' in path_string or '0 hops' in path_string: + return '' + if ' via ROUTE_TYPE_' in path_string: + path_string = path_string.split(' via ROUTE_TYPE_')[0] + path_string = re.sub(r'\s*\([^)]*hops?[^)]*\)', '', path_string, flags=re.IGNORECASE).strip() + return path_string + + +def bytes_per_hop_from_routing_and_nodes( + routing_info: Optional[dict[str, Any]], + node_ids: list[str], +) -> int: + """Bytes per hop from packet routing metadata, else inferred from hex node width. + + When ``routing_info`` includes ``bytes_per_hop`` in 1..3, that value wins. + Otherwise uses minimum half-byte width among ``node_ids`` (comma or path_nodes). + Returns ``1`` when no nodes (direct / unknown). + """ + if routing_info: + bph = routing_info.get('bytes_per_hop') + if isinstance(bph, int) and 1 <= bph <= 3: + return bph + if node_ids: + return min(len(n) // 2 for n in node_ids) + return 1 + + +def message_path_bytes_per_hop(message: Any, *, prefix_hex_chars: int = 2) -> int: + """Best-effort bytes per hop for the message path (RF metadata or inferred from path text). + + Uses ``routing_info.bytes_per_hop`` when present (1..3). Otherwise prefers + :func:`extract_path_node_ids_from_message`, then comma/continuous hex via + :func:`node_ids_from_path_string` using ``prefix_hex_chars`` for legacy paths. + + Returns ``1`` when no usable path (direct / unparseable) so conservative gates + (e.g. ``pathbytes_min:2``) do not treat unknown as multibyte. + """ + routing_info = getattr(message, 'routing_info', None) + node_ids = extract_path_node_ids_from_message(message) + if not node_ids: + ps = _normalized_message_path_string(message) + if ps: + node_ids = node_ids_from_path_string(ps, prefix_hex_chars) + return bytes_per_hop_from_routing_and_nodes(routing_info, node_ids) + + +def node_ids_from_path_string(path_str: str, prefix_hex_chars: int = 2) -> list[str]: + """Parse path display string into node IDs: multi-byte comma tokens, else fixed-width scan. + + Comma-separated tokens must each be 2, 4, or 6 hex digits (one hop per token). + Otherwise falls back to :func:`parse_path_string` (legacy continuous / 1-byte paths). + """ + if not path_str or not path_str.strip(): + return [] + path_lower = path_str.lower() + if "direct" in path_lower or "0 hops" in path_lower: + return [] + s = path_str.strip() + if " via ROUTE_TYPE_" in s: + s = s.split(" via ROUTE_TYPE_")[0].strip() + s = re.sub(r'\s*\([^)]*hops?[^)]*\)', '', s, flags=re.IGNORECASE).strip() + if not s: + return [] + if ',' in s: + parts = [p.strip() for p in s.split(',') if p.strip()] + if parts and all( + len(p) in (2, 4, 6) and all(c in _HEX_BYTE_TOKEN for c in p) + for p in parts + ): + return [p.upper() for p in parts] + return parse_path_string(s, prefix_hex_chars) diff --git a/tests/test_utils.py b/tests/test_utils.py index f171307..d57293d 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -18,21 +18,22 @@ from shared.text_utils import ( format_location_for_display, truncate_string, ) -from modules.utils import ( - calculate_packet_hash, - calculate_path_distances, - check_internet_connectivity, +from shared.parsers.packet_parser import calculate_packet_hash, verify_meshcore_advert_ed25519 +from shared.parsers.path_parser import ( decode_path_len_byte, encode_path_len_byte, extract_path_node_ids_from_message, - format_keyword_response_with_placeholders, - get_config_timezone, - is_valid_timezone, node_ids_from_path_string, parse_path_string, parse_trace_payload_route_hashes, +) +from modules.utils import ( + calculate_path_distances, + check_internet_connectivity, + format_keyword_response_with_placeholders, + get_config_timezone, + is_valid_timezone, resolve_path, - verify_meshcore_advert_ed25519, ) diff --git a/tests/unit/test_packet_capture_trace_decode.py b/tests/unit/test_packet_capture_trace_decode.py index f94fc31..dc211f2 100644 --- a/tests/unit/test_packet_capture_trace_decode.py +++ b/tests/unit/test_packet_capture_trace_decode.py @@ -4,7 +4,7 @@ import hashlib import logging from modules.service_plugins.packet_capture_service import PacketCaptureService -from modules.utils import calculate_packet_hash +from shared.parsers.packet_parser import calculate_packet_hash def test_calculate_packet_hash_trace_uses_wire_byte(): diff --git a/tests/unit/test_response_template.py b/tests/unit/test_response_template.py index fd92565..0870ddb 100644 --- a/tests/unit/test_response_template.py +++ b/tests/unit/test_response_template.py @@ -9,7 +9,7 @@ import pytest from modules.commands.test_command import TestCommand as MeshTestCommand from shared.models import MeshMessage from modules.response_template import format_piped_template -from modules.utils import message_path_bytes_per_hop +from shared.parsers.path_parser import message_path_bytes_per_hop @pytest.mark.unit