Files
meshcore-bot/shared/parsers/path_parser.py
T
agessaman 8f04d8ceb6 refactor(shared): create shared/parsers/ for path and packet parsing
Extracts canonical path-parsing and packet-hashing logic from
modules/utils.py into shared/parsers/path_parser.py and
shared/parsers/packet_parser.py, eliminating duplication across
three prior implementations. All callers updated to import from
shared.parsers.*; modules/utils.py re-exports via shared imports
for any remaining internal callers. 2866 tests pass.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-18 20:50:59 -07:00

233 lines
8.9 KiB
Python

#!/usr/bin/env python3
"""
Canonical path string / path-len byte parsers.
Used by both the bot (modules/) and the web viewer (web_viewer/).
These replace three prior duplicated implementations in message_handler.py,
web_viewer/app.py, and utils.py.
"""
import re
from typing import Any, Optional
def decode_path_len_byte(path_len_byte: int, max_path_size: int = 64) -> tuple[int, int] | None:
"""Decode the RF packet path_len byte per firmware ``Packet::isValidPathLen``.
Encoding: low 6 bits = hop count, high 2 bits = size code.
``bytes_per_hop = (path_len >> 6) + 1`` → 1, 2, 3, or 4 (4 is reserved and invalid).
Args:
path_len_byte: The single path_len byte from the packet.
max_path_size: Max path bytes (default 64, matches ``MAX_PATH_SIZE``).
Returns:
``(path_byte_length, bytes_per_hop)`` if the encoding is valid on the wire.
``None`` if reserved size class (4) or ``hop_count * bytes_per_hop > max_path_size``
— matching MeshCore where ``readFrom`` rejects the packet (no legacy reinterpretation).
"""
hop_count = path_len_byte & 63
size_code = path_len_byte >> 6
bytes_per_hop = size_code + 1 # 1, 2, 3, or 4
if bytes_per_hop == 4:
return None
path_byte_length = hop_count * bytes_per_hop
if path_byte_length > max_path_size:
return None
return (path_byte_length, bytes_per_hop)
def parse_trace_payload_route_hashes(payload: bytes) -> list[str]:
"""Extract TRACE route hash segments from mesh payload (after tag, auth, flags).
Matches MeshCore: ``bytes_per_hash = 1 << (flags & 3)`` for bytes at ``payload[9:]``.
If the tail length is not a multiple of ``bytes_per_hash``, falls back to 1-byte
segments (same as MessageHandler._process_packet_path).
Args:
payload: Full mesh payload bytes (not including header/path).
Returns:
List of uppercase hex strings, one per hop hash.
"""
if len(payload) < 9:
return []
flags = payload[8]
path_hash_len = 1 << (flags & 3)
if path_hash_len <= 0:
path_hash_len = 1
path_hashes_bytes = payload[9:]
if not path_hashes_bytes:
return []
try:
if len(path_hashes_bytes) % path_hash_len == 0:
return [
path_hashes_bytes[i : i + path_hash_len].hex().upper()
for i in range(0, len(path_hashes_bytes), path_hash_len)
]
except Exception:
pass
return [f"{b:02X}" for b in path_hashes_bytes]
def encode_path_len_byte(hop_count: int, bytes_per_hop: int) -> int:
"""Pack hop count and hash size into the single path_len wire byte (inverse of decode_path_len_byte).
Firmware: low 6 bits = hop count, high 2 bits = size code with bytes_per_hop = (code + 1).
Valid bytes_per_hop are 1, 2, or 3 (size code 4 is reserved).
"""
if bytes_per_hop not in (1, 2, 3):
raise ValueError(f"bytes_per_hop must be 1, 2, or 3, got {bytes_per_hop}")
hop_count = int(hop_count) & 0x3F
size_code = (int(bytes_per_hop) - 1) & 0x03
return (size_code << 6) | hop_count
def parse_path_string(path_str: str, prefix_hex_chars: int = 2) -> list[str]:
"""Parse a path string to extract node IDs.
Handles various formats:
- "11,98,a4,49,cd,5f,01" (comma-separated)
- "11 98 a4 49 cd 5f 01" (space-separated)
- "1198a449cd5f01" (continuous hex)
- "01,5f (2 hops)" (with hop count suffix)
Args:
path_str: Path string in various formats.
prefix_hex_chars: Number of hex characters per node (2 = 1 byte, 4 = 2 bytes). Default 2.
Returns:
List[str]: List of uppercase hex node IDs (each of length prefix_hex_chars).
"""
if not path_str:
return []
# Remove hop count suffix if present (e.g., " (2 hops)")
path_str = re.sub(r'\s*\([^)]*hops?[^)]*\)', '', path_str, flags=re.IGNORECASE)
path_str = path_str.strip()
# Replace common separators with spaces
path_str = path_str.replace(',', ' ').replace(':', ' ')
# Extract hex values using regex (prefix_hex_chars-wide hex tokens)
hex_pattern = rf'[0-9a-fA-F]{{{prefix_hex_chars}}}'
hex_matches = re.findall(hex_pattern, path_str)
# Legacy fallback: if configured length > 2 and no matches, retry with 2-char (1-byte) nodes
if not hex_matches and prefix_hex_chars > 2:
legacy_pattern = r'[0-9a-fA-F]{2}'
hex_matches = re.findall(legacy_pattern, path_str)
# Convert to uppercase for consistency
return [match.upper() for match in hex_matches]
_HEX_BYTE_TOKEN = frozenset('0123456789aAbBcCdDeEfF')
def extract_path_node_ids_from_message(message: Any) -> list[str]:
"""Extract path node IDs from a mesh message (MeshCore multi-byte paths).
Prefers ``routing_info.path_nodes``; else parses comma-separated hop tokens
(2, 4, or 6 hex chars each) from ``message.path``. Matches TestCommand logic.
Returns:
List of node IDs (uppercase hex). Empty when direct / unparseable.
"""
routing_info = getattr(message, 'routing_info', None)
if routing_info is not None and routing_info.get('path_length', 0) == 0:
return []
if routing_info and routing_info.get('path_nodes'):
return [str(n).upper().strip() for n in routing_info['path_nodes']]
path_string = getattr(message, 'path', None) or ''
if not path_string or "Direct" in path_string or "0 hops" in path_string:
return []
if " via ROUTE_TYPE_" in path_string:
path_string = path_string.split(" via ROUTE_TYPE_")[0]
if '(' in path_string:
path_string = path_string.split('(')[0].strip()
if ',' in path_string:
parts = [p.strip() for p in path_string.split(',') if p.strip()]
if parts and all(
len(p) in (2, 4, 6) and all(c in _HEX_BYTE_TOKEN for c in p)
for p in parts
):
return [p.upper() for p in parts]
return []
def _normalized_message_path_string(message: Any) -> str:
"""Strip route suffix and hop-count suffix from message.path for continuous-hex parsing."""
path_string = (getattr(message, 'path', None) or '').strip()
if not path_string or 'Direct' in path_string or '0 hops' in path_string:
return ''
if ' via ROUTE_TYPE_' in path_string:
path_string = path_string.split(' via ROUTE_TYPE_')[0]
path_string = re.sub(r'\s*\([^)]*hops?[^)]*\)', '', path_string, flags=re.IGNORECASE).strip()
return path_string
def bytes_per_hop_from_routing_and_nodes(
routing_info: Optional[dict[str, Any]],
node_ids: list[str],
) -> int:
"""Bytes per hop from packet routing metadata, else inferred from hex node width.
When ``routing_info`` includes ``bytes_per_hop`` in 1..3, that value wins.
Otherwise uses minimum half-byte width among ``node_ids`` (comma or path_nodes).
Returns ``1`` when no nodes (direct / unknown).
"""
if routing_info:
bph = routing_info.get('bytes_per_hop')
if isinstance(bph, int) and 1 <= bph <= 3:
return bph
if node_ids:
return min(len(n) // 2 for n in node_ids)
return 1
def message_path_bytes_per_hop(message: Any, *, prefix_hex_chars: int = 2) -> int:
"""Best-effort bytes per hop for the message path (RF metadata or inferred from path text).
Uses ``routing_info.bytes_per_hop`` when present (1..3). Otherwise prefers
:func:`extract_path_node_ids_from_message`, then comma/continuous hex via
:func:`node_ids_from_path_string` using ``prefix_hex_chars`` for legacy paths.
Returns ``1`` when no usable path (direct / unparseable) so conservative gates
(e.g. ``pathbytes_min:2``) do not treat unknown as multibyte.
"""
routing_info = getattr(message, 'routing_info', None)
node_ids = extract_path_node_ids_from_message(message)
if not node_ids:
ps = _normalized_message_path_string(message)
if ps:
node_ids = node_ids_from_path_string(ps, prefix_hex_chars)
return bytes_per_hop_from_routing_and_nodes(routing_info, node_ids)
def node_ids_from_path_string(path_str: str, prefix_hex_chars: int = 2) -> list[str]:
"""Parse path display string into node IDs: multi-byte comma tokens, else fixed-width scan.
Comma-separated tokens must each be 2, 4, or 6 hex digits (one hop per token).
Otherwise falls back to :func:`parse_path_string` (legacy continuous / 1-byte paths).
"""
if not path_str or not path_str.strip():
return []
path_lower = path_str.lower()
if "direct" in path_lower or "0 hops" in path_lower:
return []
s = path_str.strip()
if " via ROUTE_TYPE_" in s:
s = s.split(" via ROUTE_TYPE_")[0].strip()
s = re.sub(r'\s*\([^)]*hops?[^)]*\)', '', s, flags=re.IGNORECASE).strip()
if not s:
return []
if ',' in s:
parts = [p.strip() for p in s.split(',') if p.strip()]
if parts and all(
len(p) in (2, 4, 6) and all(c in _HEX_BYTE_TOKEN for c in p)
for p in parts
):
return [p.upper() for p in parts]
return parse_path_string(s, prefix_hex_chars)