Update dependencies and enhance path handling for multi-byte support

- Updated `meshcore` dependency version to `2.2.14` in both `pyproject.toml` and `requirements.txt`.
- Added multi-byte path support in the `PathCommand`, allowing for 1-, 2-, and 3-byte-per-hop paths.
- Enhanced `MessageHandler` to utilize `routing_info` for accurate path extraction and validation.
- Improved path extraction methods in `MultitestCommand` and `TestCommand` to prefer `routing_info` for node IDs.
- Refactored path handling logic across various commands to ensure consistent multi-byte path processing.
This commit is contained in:
agessaman
2026-03-05 13:37:38 -08:00
parent 4e5addd5df
commit 7d76ed443b
10 changed files with 495 additions and 255 deletions
+9
View File
@@ -2,6 +2,15 @@
This document explains all configuration parameters for the Path Command, which decodes hex path data to identify repeaters in message routing paths.
## Multi-byte path support
The path command supports **1-, 2-, and 3-byte-per-hop** paths (2, 4, or 6 hex characters per node).
- **`path` with no arguments**: Uses the current messages decoded path when available (from routing info). No re-parsing; node list and hop size come from the packet.
- **`path <hex>` with arguments**:
- **Comma-separated** (e.g. `path 0102,5f7e`): Hop size is inferred from token length. All tokens must be the same length (2, 4, or 6 hex chars). Example: `0102,5f7e` → two 2-byte hops.
- **Continuous hex** (e.g. `path 01025f7e`): The bots [Bot] **`prefix_bytes`** is used (2 hex chars = 1 byte, 4 = 2 bytes, 6 = 3 bytes). Use comma-separated input to force a multi-byte interpretation when the bot is in 1-byte mode.
## Quick Start: Presets
The Path Command supports three presets that configure multiple related settings:
+23 -16
View File
@@ -825,25 +825,32 @@ class BaseCommand(ABC):
"""Check if this command can execute right now (permissions, cooldown, etc.)"""
return self.can_execute(message)
def get_path_display_string(self, message: MeshMessage) -> str:
"""Get path string for display (test/ack placeholders). Prefers message.routing_info for multi-byte and direct."""
routing_info = getattr(message, 'routing_info', None)
if routing_info is not None:
path_length = routing_info.get('path_length', 0)
if path_length == 0:
return "Direct"
path_nodes = routing_info.get('path_nodes', [])
if path_nodes:
path_str = ','.join(str(n).lower() for n in path_nodes)
return f"{path_str} ({len(path_nodes)} hops)"
if not message.path:
return "Unknown"
path_string = message.path
if " via ROUTE_TYPE_" in path_string:
path_string = path_string.split(" via ROUTE_TYPE_")[0]
return path_string.strip() or "Unknown"
def build_enhanced_connection_info(self, message: MeshMessage) -> str:
"""Build enhanced connection info with SNR, RSSI, and parsed route information"""
# Extract just the hops and path info without the route type
routing_info = message.path or "Unknown routing"
# Clean up the routing info to remove the "via ROUTE_TYPE_*" part
if "via ROUTE_TYPE_" in routing_info:
# Extract just the hops and path part
parts = routing_info.split(" via ROUTE_TYPE_")
if len(parts) > 0:
routing_info = parts[0]
# Add SNR and RSSI
"""Build enhanced connection info with SNR, RSSI, and parsed route information.
Uses message.routing_info when present (multi-byte path, direct) for path part.
"""
path_part = self.get_path_display_string(message)
snr_info = f"SNR: {message.snr or 'Unknown'} dB"
rssi_info = f"RSSI: {message.rssi or 'Unknown'} dBm"
# Build enhanced connection info
connection_info = f"{routing_info} | {snr_info} | {rssi_info}"
connection_info = f"{path_part} | {snr_info} | {rssi_info}"
return connection_info
def format_timestamp(self, message: MeshMessage) -> str:
+73 -43
View File
@@ -5,12 +5,13 @@ Listens for a period of time and collects all unique paths from incoming message
"""
import asyncio
import re
import time
from typing import Set, Optional, Dict
from dataclasses import dataclass
from .base_command import BaseCommand
from ..models import MeshMessage
from ..utils import calculate_packet_hash
from ..utils import calculate_packet_hash, parse_path_string
@dataclass
@@ -113,80 +114,109 @@ class MultitestCommand(BaseCommand):
return False
def extract_path_from_rf_data(self, rf_data: dict) -> Optional[str]:
"""Extract path in prefix string format from RF data routing_info"""
"""Extract path in prefix string format from RF data routing_info.
Supports 1-, 2-, and 3-byte-per-hop (2, 4, or 6 hex chars per node).
"""
try:
routing_info = rf_data.get('routing_info')
if not routing_info:
return None
path_nodes = routing_info.get('path_nodes', [])
if not path_nodes:
# Try to extract from path_hex if path_nodes not available
# Fallback: build from path_hex using bytes_per_hop from packet
path_hex = routing_info.get('path_hex', '')
if path_hex:
n = getattr(self.bot, 'prefix_hex_chars', 2)
bytes_per_hop = routing_info.get('bytes_per_hop')
n = (bytes_per_hop * 2) if bytes_per_hop and bytes_per_hop >= 1 else getattr(self.bot, 'prefix_hex_chars', 2)
if n <= 0:
n = 2
path_nodes = [path_hex[i:i+n] for i in range(0, len(path_hex), n)]
path_nodes = [path_hex[i:i + n] for i in range(0, len(path_hex), n)]
if (len(path_hex) % n) != 0:
path_nodes = [path_hex[i:i+2] for i in range(0, len(path_hex), 2)]
path_nodes = [path_hex[i:i + 2] for i in range(0, len(path_hex), 2)]
if path_nodes:
# Validate and format path nodes
valid_len = getattr(self.bot, 'prefix_hex_chars', 2)
if valid_len <= 0:
valid_len = 2
# Validate: each node 2, 4, or 6 hex chars
valid_parts = []
for node in path_nodes:
node_str = str(node).lower().strip()
if len(node_str) in (2, valid_len) and all(c in '0123456789abcdef' for c in node_str):
if len(node_str) in (2, 4, 6) and all(c in '0123456789abcdef' for c in node_str):
valid_parts.append(node_str)
if valid_parts:
return ','.join(valid_parts)
return None
except Exception as e:
self.logger.debug(f"Error extracting path from RF data: {e}")
return None
def extract_path_from_message(self, message: MeshMessage) -> Optional[str]:
"""Extract path in prefix string format from a message"""
"""Extract path in prefix string format from a message.
Prefers message.routing_info.path_nodes when present (multi-byte).
When routing_info has bytes_per_hop, uses it instead of inferring hop size.
Otherwise parses message.path: comma-separated tokens infer (2/4/6 hex);
continuous hex uses bot.prefix_hex_chars via parse_path_string().
"""
routing_info = getattr(message, 'routing_info', None)
bytes_per_hop = routing_info.get('bytes_per_hop') if routing_info else None
hex_chars_per_node = (bytes_per_hop * 2) if (bytes_per_hop and bytes_per_hop >= 1) else None
# Prefer routing_info when present (same as path command; no re-parse)
if routing_info is not None:
if routing_info.get('path_length', 0) == 0:
return None
path_nodes = routing_info.get('path_nodes', [])
if not path_nodes and hex_chars_per_node:
# Build from path_hex when path_nodes missing but bytes_per_hop known
path_hex = routing_info.get('path_hex', '')
if path_hex:
n = hex_chars_per_node
path_nodes = [path_hex[i:i + n] for i in range(0, len(path_hex), n)]
if (len(path_hex) % n) != 0:
path_nodes = [path_hex[i:i + 2] for i in range(0, len(path_hex), 2)]
if path_nodes:
valid = [str(n).lower().strip() for n in path_nodes
if len(str(n).strip()) in (2, 4, 6) and all(c in '0123456789abcdef' for c in str(n).lower().strip())]
if valid:
return ','.join(valid)
if not message.path:
return None
# Check if it's a direct connection
if "Direct" in message.path or "0 hops" in message.path:
return None
# Try to extract path nodes from the path string
# Path strings are typically in format: "node1,node2,node3 via ROUTE_TYPE_*"
# or just "node1,node2,node3"
path_string = message.path
# Remove route type suffix if present
if " via ROUTE_TYPE_" in path_string:
path_string = path_string.split(" via ROUTE_TYPE_")[0]
# Check if it looks like a comma-separated path
path_string = re.sub(r'\s*\([^)]*hops?[^)]*\)', '', path_string, flags=re.IGNORECASE).strip()
# When bytes_per_hop is known, use it; otherwise infer from commas or use bot.prefix_hex_chars
expected_n = hex_chars_per_node or getattr(self.bot, 'prefix_hex_chars', 2)
if expected_n <= 0:
expected_n = 2
if ',' in path_string:
# Clean up any extra info (like hop counts in parentheses)
# Example: "01,7e,55,86 (4 hops)" -> "01,7e,55,86"
if '(' in path_string:
path_string = path_string.split('(')[0].strip()
# Validate that all parts are 2-character hex values
parts = path_string.split(',')
valid_parts = []
for part in parts:
part = part.strip()
# Check if it's a 2-character hex value
if len(part) == 2 and all(c in '0123456789abcdefABCDEF' for c in part):
valid_parts.append(part.lower())
if valid_parts:
return ','.join(valid_parts)
tokens = [t.strip() for t in path_string.split(',') if t.strip()]
if tokens:
if hex_chars_per_node:
# Use known hop size: all tokens must be that length
valid_hex = all(
len(t) == expected_n and all(c in '0123456789aAbBcCdDeEfF' for c in t)
for t in tokens
)
if valid_hex:
return ','.join(t.lower() for t in tokens)
else:
# Infer from token length (2, 4, or 6, all same)
lengths = {len(t) for t in tokens}
valid_hex = all(
len(t) in (2, 4, 6) and all(c in '0123456789aAbBcCdDeEfF' for c in t)
for t in tokens
)
if valid_hex and len(lengths) == 1 and next(iter(lengths)) in (2, 4, 6):
return ','.join(t.lower() for t in tokens)
# Continuous hex: use bytes_per_hop when known, else bot.prefix_hex_chars
node_ids = parse_path_string(path_string, prefix_hex_chars=expected_n)
if node_ids:
return ','.join(n.lower() for n in node_ids)
return None
def get_rf_data_for_message(self, message: MeshMessage) -> Optional[dict]:
+86 -76
View File
@@ -10,7 +10,7 @@ import asyncio
from typing import List, Optional, Dict, Any, Tuple, Callable
from .base_command import BaseCommand
from ..models import MeshMessage
from ..utils import calculate_distance
from ..utils import calculate_distance, parse_path_string
class PathCommand(BaseCommand):
@@ -217,39 +217,39 @@ class PathCommand(BaseCommand):
return True
async def _decode_path(self, path_input: str) -> str:
"""Decode hex path data to repeater names"""
"""Decode hex path data to repeater names.
Comma-separated tokens infer hop size (2, 4, or 6 hex chars per node).
Otherwise uses bot.prefix_hex_chars via parse_path_string().
"""
try:
# Parse the path input - handle various formats
# Examples: "11,98,a4,49,cd,5f,01" or "11 98 a4 49 cd 5f 01" or "1198a449cd5f01"
path_input = path_input.replace(',', ' ').replace(':', ' ')
# Extract hex values using regex
# Try configured width first
n = getattr(self.bot, "prefix_hex_chars", 2)
hex_pattern = rf'[0-9a-fA-F]{{{n}}}'
hex_matches = re.findall(hex_pattern, path_input)
# Strip hop-count suffix if present (e.g. "01,5f (2 hops)")
path_input = re.sub(r'\s*\([^)]*hops?[^)]*\)', '', path_input, flags=re.IGNORECASE)
path_input = path_input.strip()
# Backward compatibility:
# if no matches and we're expecting >2 chars, try legacy 2-char paths
if not hex_matches and n > 2:
legacy_pattern = r'[0-9a-fA-F]{2}'
hex_matches = re.findall(legacy_pattern, path_input)
if not hex_matches:
node_ids = None
# Comma-separated: infer hex chars per node from token length (2, 4, or 6)
if ',' in path_input:
tokens = [t.strip() for t in path_input.split(',') if t.strip()]
if tokens:
lengths = {len(t) for t in tokens}
valid_hex = all(
len(t) in (2, 4, 6) and all(c in '0123456789aAbBcCdDeEfF' for c in t)
for t in tokens
)
if valid_hex and len(lengths) == 1 and next(iter(lengths)) in (2, 4, 6):
node_ids = [t.upper() for t in tokens]
if node_ids is None:
prefix_hex_chars = getattr(self.bot, 'prefix_hex_chars', 2)
node_ids = parse_path_string(path_input, prefix_hex_chars=prefix_hex_chars)
if not node_ids:
return self.translate('commands.path.no_valid_hex')
# Convert to uppercase for consistency
# hex_matches preserves the order from the original path
node_ids = [match.upper() for match in hex_matches]
self.logger.info(f"Decoding path with {len(node_ids)} nodes: {','.join(node_ids)}")
# Look up repeater names for each node ID (order preserved)
repeater_info = await self._lookup_repeater_names(node_ids)
# Format the response
return self._format_path_response(node_ids, repeater_info)
except Exception as e:
self.logger.error(f"Error decoding path: {e}")
return self.translate('commands.path.error_decoding', error=str(e))
@@ -1418,24 +1418,28 @@ class PathCommand(BaseCommand):
if stored_paths:
# Build the path we're decoding (full path context)
decoded_path_hex = ''.join([node.lower() for node in path_context])
path_n = len(path_context[0]) if path_context else 0 # hex chars per node in current path
# Check if any stored path shares common segments with decoded path
# This is useful for prefix collision resolution
for stored_path in stored_paths:
stored_hex = stored_path.get('path_hex', '').lower()
obs_count = stored_path.get('observation_count', 1)
if stored_hex:
# Chunk using stored bytes_per_hop (multi-byte path support)
n = (stored_path.get('bytes_per_hop') or 1) * 2
if n <= 0:
n = 2
stored_nodes = [stored_hex[i:i+n] for i in range(0, len(stored_hex), n)]
if (len(stored_hex) % n) != 0:
stored_nodes = [stored_hex[i:i+2] for i in range(0, len(stored_hex), 2)]
decoded_nodes = [decoded_path_hex[i:i+n] for i in range(0, len(decoded_path_hex), n)]
if (len(decoded_path_hex) % n) != 0:
decoded_nodes = [decoded_path_hex[i:i+2] for i in range(0, len(decoded_path_hex), 2)]
if not stored_hex:
continue
# Chunk stored path by its bytes_per_hop
stored_n = (stored_path.get('bytes_per_hop') or 1) * 2
if stored_n <= 0:
stored_n = 2
# Only compare when same hop size (otherwise 1-byte vs 2-byte would mismatch)
if path_n != stored_n:
continue
stored_nodes = [stored_hex[i:i+stored_n] for i in range(0, len(stored_hex), stored_n)]
if (len(stored_hex) % stored_n) != 0:
stored_nodes = [stored_hex[i:i+2] for i in range(0, len(stored_hex), 2)]
decoded_nodes = [decoded_path_hex[i:i+path_n] for i in range(0, len(decoded_path_hex), path_n)]
if (len(decoded_path_hex) % path_n) != 0:
decoded_nodes = [decoded_path_hex[i:i+2] for i in range(0, len(decoded_path_hex), 2)]
# Count how many nodes appear in both paths (in order)
common_segments = 0
@@ -1717,42 +1721,48 @@ class PathCommand(BaseCommand):
await self.send_response(message, current_message, skip_user_rate_limit=True)
async def _extract_path_from_recent_messages(self) -> str:
"""Extract path from the current message's path information (same as test command)"""
"""Extract path from the current message's path information (same as test command).
Prefers already-extracted routing_info.path_nodes when present (multi-byte path support).
"""
try:
# Use the path information from the current message being processed
# This is the same reliable source that the test command uses
if hasattr(self, '_current_message') and self._current_message and self._current_message.path:
path_string = self._current_message.path
# Check if it's a direct connection
if "Direct" in path_string or "0 hops" in path_string:
return self.translate('commands.path.direct_connection')
# Try to extract path nodes from the path string
# Path strings are typically in format: "node1,node2,node3 via ROUTE_TYPE_*"
if " via ROUTE_TYPE_" in path_string:
# Extract just the path part before the route type
path_part = path_string.split(" via ROUTE_TYPE_")[0]
else:
path_part = path_string
# Check if it looks like a comma-separated path
if ',' in path_part:
path_input = path_part
return await self._decode_path(path_input)
else:
# Try to decode even single nodes (e.g., "01" should be decoded to a repeater name)
# Check if path_part looks like it contains hex values
hex_pattern = rf'[0-9a-fA-F]{{{self.bot.prefix_hex_chars}}}'
if re.search(hex_pattern, path_part):
# Looks like hex values, try to decode
return await self._decode_path(path_part)
else:
# Unknown format - just show the raw path string
return self.translate('commands.path.path_prefix', path_string=path_string)
else:
if not hasattr(self, '_current_message') or not self._current_message:
return self.translate('commands.path.no_path')
msg = self._current_message
# Prefer routing_info when present (no re-parsing; preserves bytes_per_hop)
routing_info = getattr(msg, 'routing_info', None)
if routing_info is not None:
path_length = routing_info.get('path_length', 0)
if path_length == 0:
return self.translate('commands.path.direct_connection')
path_nodes = routing_info.get('path_nodes', [])
if path_nodes:
node_ids = [n.upper() for n in path_nodes]
self.logger.info(f"Decoding path from routing_info with {len(node_ids)} nodes: {','.join(node_ids)}")
repeater_info = await self._lookup_repeater_names(node_ids)
return self._format_path_response(node_ids, repeater_info)
# Fallback: parse message.path string (e.g. no routing_info or legacy path)
if not msg.path:
return self.translate('commands.path.no_path')
path_string = msg.path
if "Direct" in path_string or "0 hops" in path_string:
return self.translate('commands.path.direct_connection')
if " via ROUTE_TYPE_" in path_string:
path_part = path_string.split(" via ROUTE_TYPE_")[0]
else:
path_part = path_string
if ',' in path_part:
return await self._decode_path(path_part)
hex_pattern = rf'[0-9a-fA-F]{{{getattr(self.bot, "prefix_hex_chars", 2)}}}'
if re.search(hex_pattern, path_part):
return await self._decode_path(path_part)
return self.translate('commands.path.path_prefix', path_string=path_string)
except Exception as e:
self.logger.error(f"Error extracting path from current message: {e}")
return self.translate('commands.path.error_extracting', error=str(e))
+52 -43
View File
@@ -145,48 +145,30 @@ class TestCommand(BaseCommand):
return None
def _extract_path_node_ids(self, message: MeshMessage) -> List[str]:
"""Extract path node IDs from message path string.
"""Extract path node IDs from message. Prefers routing_info.path_nodes (multi-byte); else parses message.path.
Args:
message: The message object containing the path.
Returns:
List[str]: List of valid 2-character hex node IDs.
List[str]: List of node IDs (2, 4, or 6 hex chars per node, uppercase).
"""
if not message.path:
routing_info = getattr(message, 'routing_info', None)
if routing_info is not None and routing_info.get('path_length', 0) == 0:
return []
# Check if it's a direct connection
if "Direct" in message.path or "0 hops" in message.path:
if routing_info and routing_info.get('path_nodes'):
return [str(n).upper().strip() for n in routing_info['path_nodes']]
if not message.path or "Direct" in message.path or "0 hops" in message.path:
return []
# Extract path nodes from the path string
# Path strings are typically in format: "node1,node2,node3 via ROUTE_TYPE_*"
# or just "node1,node2,node3"
path_string = message.path
# Remove route type suffix if present
if " via ROUTE_TYPE_" in path_string:
path_string = path_string.split(" via ROUTE_TYPE_")[0]
# Check if it looks like a comma-separated path
if '(' in path_string:
path_string = path_string.split('(')[0].strip()
if ',' in path_string:
# Clean up any extra info (like hop counts in parentheses)
# Example: "01,7e,55,86 (4 hops)" -> "01,7e,55,86"
if '(' in path_string:
path_string = path_string.split('(')[0].strip()
# Validate that all parts are 2-character hex values
parts = path_string.split(',')
valid_parts = []
for part in parts:
part = part.strip()
# Check if it's a 2-character hex value
if len(part) == 2 and all(c in '0123456789abcdefABCDEF' for c in part):
valid_parts.append(part.upper())
return valid_parts
parts = [p.strip() for p in path_string.split(',') if p.strip()]
if parts and all(
len(p) in (2, 4, 6) and all(c in '0123456789aAbBcCdDeEfF' for c in p)
for p in parts
):
return [p.upper() for p in parts]
return []
@@ -576,8 +558,14 @@ class TestCommand(BaseCommand):
"""
node_ids = self._extract_path_node_ids(message)
if len(node_ids) < 2:
# Check if it's a direct connection
if not message.path or "Direct" in message.path or "0 hops" in message.path:
routing_info = getattr(message, 'routing_info', None)
is_direct = (
(routing_info is not None and routing_info.get('path_length', 0) == 0)
or not message.path
or "Direct" in (message.path or "")
or "0 hops" in (message.path or "")
)
if is_direct:
return "N/A" # Direct connection, no path to calculate
return "" # Path exists but insufficient nodes
@@ -628,8 +616,14 @@ class TestCommand(BaseCommand):
"""
node_ids = self._extract_path_node_ids(message)
if len(node_ids) < 2:
# Check if it's a direct connection
if not message.path or "Direct" in message.path or "0 hops" in message.path:
routing_info = getattr(message, 'routing_info', None)
is_direct = (
(routing_info is not None and routing_info.get('path_length', 0) == 0)
or not message.path
or "Direct" in (message.path or "")
or "0 hops" in (message.path or "")
)
if is_direct:
return "N/A" # Direct connection, no path to calculate
return "" # Path exists but insufficient nodes
@@ -686,20 +680,35 @@ class TestCommand(BaseCommand):
connection_info = self.build_enhanced_connection_info(message)
timestamp = self.format_timestamp(message)
elapsed = self.format_elapsed(message)
# Calculate distance placeholders
path_display = self.get_path_display_string(message)
# Hops: from message.hops, or routing_info.path_length, or len(path_nodes)
routing_info = getattr(message, 'routing_info', None)
if getattr(message, 'hops', None) is not None:
hops_val = message.hops
elif routing_info is not None:
hops_val = routing_info.get('path_length')
if hops_val is None and routing_info.get('path_nodes'):
hops_val = len(routing_info['path_nodes'])
else:
hops_val = None
hops_str = str(hops_val) if hops_val is not None else "?"
if hops_val is None:
hops_label = "?"
elif hops_val == 1:
hops_label = "1 hop"
else:
hops_label = f"{hops_val} hops"
path_distance = self._calculate_path_distance(message)
firstlast_distance = self._calculate_firstlast_distance(message)
# Format phrase part - add colon and space if phrase exists
phrase_part = f": {phrase}" if phrase else ""
return response_format.format(
sender=message.sender_id or self.translate('common.unknown_sender'),
phrase=phrase,
phrase_part=phrase_part,
connection_info=connection_info,
path=message.path or self.translate('common.unknown_path'),
path=path_display,
hops=hops_str,
hops_label=hops_label,
timestamp=timestamp,
elapsed=elapsed,
snr=message.snr or self.translate('common.unknown'),
+101 -74
View File
@@ -371,6 +371,7 @@ class MessageHandler:
# If we have RF data with routing information, update the path with that instead
if recent_rf_data and recent_rf_data.get('routing_info'):
rf_routing = recent_rf_data['routing_info']
message.routing_info = rf_routing # Path command uses this for multi-byte path (no re-parse)
if rf_routing.get('path_length', 0) > 0:
path_nodes = rf_routing.get('path_nodes', [])
route_type = rf_routing.get('route_type', 'Unknown')
@@ -763,13 +764,39 @@ class MessageHandler:
'payload_type': decoded_packet.get('payload_type_name', 'Unknown'),
'packet_hash': packet_hash # Store hash for packet tracking
}
# Validate path consistency (path_byte_length, path_hex, path_nodes, bytes_per_hop)
path_len = routing_info['path_length']
path_byte_len = routing_info.get('path_byte_length')
path_hex_str = routing_info.get('path_hex', '')
path_nodes_list = routing_info.get('path_nodes') or []
bph = routing_info.get('bytes_per_hop', 1) or 1
expected_hex_len = (path_byte_len * 2) if path_byte_len is not None else (path_len * bph * 2)
if path_len > 0 and path_hex_str:
if len(path_hex_str) != expected_hex_len:
self.logger.warning(
"Path length mismatch: path_hex has %d hex chars, expected %d (path_byte_length=%s, path_length=%s, bytes_per_hop=%s)",
len(path_hex_str), expected_hex_len, path_byte_len, path_len, bph
)
if path_nodes_list and len(path_nodes_list) != path_len:
self.logger.warning(
"Path nodes count mismatch: %d nodes, path_length=%d",
len(path_nodes_list), path_len
)
if path_nodes_list and bph >= 1 and any(len(str(n)) != bph * 2 for n in path_nodes_list):
self.logger.warning(
"Path node width mismatch: bytes_per_hop=%d expects %d hex chars per node, nodes=%s",
bph, bph * 2, path_nodes_list[:5]
)
# Log the routing information for analysis
if routing_info['path_length'] > 0:
# Format path with configured node length (match decode layer)
path_hex = routing_info['path_hex']
path_nodes_fmt = self._path_hex_to_nodes(path_hex)
formatted_path = ','.join(path_nodes_fmt)
# Use path_nodes when present (multi-byte); else chunk path_hex
path_nodes_list = routing_info.get('path_nodes') or []
if path_nodes_list:
formatted_path = ','.join(str(n).lower() for n in path_nodes_list)
else:
path_hex = routing_info['path_hex']
path_nodes_fmt = self._path_hex_to_nodes(path_hex)
formatted_path = ','.join(path_nodes_fmt)
path_bytes_str = decoded_packet.get('path_byte_length', routing_info['path_length'])
log_message = f"🛣️ ROUTING INFO: {routing_info['route_type']} | Path: {formatted_path} ({routing_info['path_length']} hops, {path_bytes_str} bytes) | Payload: {routing_info['payload_length']} bytes | Type: {routing_info['payload_type']}"
self.logger.info(log_message)
@@ -1375,6 +1402,55 @@ class MessageHandler:
nodes = [path_hex[i:i + 2].lower() for i in range(0, len(path_hex), 2)]
return nodes
def _get_path_from_rf_data(
self,
rf_data: Dict[str, Any],
payload_hex: Optional[str] = None,
packet_info: Optional[Dict[str, Any]] = None
) -> Tuple[Optional[str], Optional[List[str]], int]:
"""Get path string, path nodes, and hop count from RF data (single source for path extraction).
Prefers routing_info.path_nodes when present (no re-decode; correct multi-byte).
Otherwise decodes (or uses provided packet_info) and gets path from decoder's 'path'
or chunks path_hex using bytes_per_hop from the packet.
Returns:
(path_string, path_nodes, hops). path_nodes is a list for mesh graph; hops is path_length or 255.
"""
routing_info = rf_data.get('routing_info') or {}
path_nodes_list = routing_info.get('path_nodes')
if path_nodes_list:
path_str = ','.join(str(n).lower() for n in path_nodes_list)
return (path_str, list(path_nodes_list), len(path_nodes_list))
raw_hex = rf_data.get('raw_hex')
if not raw_hex:
return (None, None, 255)
if packet_info is None:
payload = payload_hex or rf_data.get('payload')
packet_info = self.decode_meshcore_packet(raw_hex, payload)
if not packet_info:
return (None, None, 255)
hops = packet_info.get('path_len', 255)
path_nodes_list = packet_info.get('path_nodes') or packet_info.get('path') or []
if path_nodes_list:
path_str = ','.join(str(n).lower() for n in path_nodes_list)
return (path_str, list(path_nodes_list), len(path_nodes_list))
path_hex = packet_info.get('path_hex', '')
if path_hex and len(path_hex) >= 2:
bytes_per_hop = packet_info.get('bytes_per_hop', 1)
n = (bytes_per_hop * 2) if bytes_per_hop and bytes_per_hop >= 1 else 2
path_nodes_list = [path_hex[i:i + n].lower() for i in range(0, len(path_hex), n)]
if (len(path_hex) % n) != 0:
path_nodes_list = [path_hex[i:i + 2].lower() for i in range(0, len(path_hex), 2)]
if path_nodes_list:
return (','.join(path_nodes_list), path_nodes_list, len(path_nodes_list))
path_info = packet_info.get('path_info') or {}
path_nodes_list = path_info.get('path') or []
if path_nodes_list:
path_str = ','.join(str(n).lower() for n in path_nodes_list)
return (path_str, list(path_nodes_list), len(path_nodes_list))
return (None, None, hops)
def _process_packet_path(self, path_bytes: bytes, payload: bytes,
route_type: RouteType, payload_type: PayloadType) -> dict:
"""
@@ -1637,96 +1713,45 @@ class MessageHandler:
rssi = recent_rf_data['rssi']
self.logger.debug(f"Using RSSI from RF data: {rssi}")
# Try to extract path information from raw hex directly
# Single path source: prefer routing_info, else decode/fallback via helper
path_string = None
hops = payload.get('path_len', 255)
# First try the packet decoder
# Use payload field if available, otherwise use raw_hex
payload_hex = recent_rf_data.get('payload')
packet_info = self.decode_meshcore_packet(raw_hex, payload_hex)
# Get packet_hash from recent_rf_data if available (for trace correlation)
packet_hash = recent_rf_data.get('packet_hash')
if packet_hash and packet_info:
packet_info['packet_hash'] = packet_hash
if packet_info and packet_info.get('path_len') is not None:
# Valid packet decoded - use the results even if path is empty (0 hops = direct)
hops = packet_info.get('path_len', 0)
# Check if this is a TRACE packet with SNR data
if packet_info.get('payload_type') == 9: # TRACE packet
# For TRACE packets, extract routing path from payload pathHashes
# The path field contains SNR data, but the actual routing path is in payload
path_info = packet_info.get('path_info', {})
path_hashes = path_info.get('path_hashes') or path_info.get('path', [])
if path_hashes:
# Convert pathHashes to path string
path_string = ','.join(path_hashes)
self.logger.info(f"🎯 EXTRACTED PATH FROM TRACE PACKET: {path_string} ({len(path_hashes)} hops)")
# Update mesh graph with trace path - bot is the destination, so we can confirm these edges
# Since the bot received this trace packet, it's the destination node
self.logger.debug(f"Path from TRACE packet: {path_string} ({len(path_hashes)} hops)")
if hasattr(self.bot, 'mesh_graph') and self.bot.mesh_graph and self.bot.mesh_graph.capture_enabled:
self._update_mesh_graph_from_trace(path_hashes, packet_info)
else:
path_string = "Direct" if hops == 0 else f"Unknown routing ({hops} hops)"
self.logger.info(f"🎯 EXTRACTED PATH FROM TRACE PACKET: {path_string}")
self.logger.debug(f"Path from TRACE packet: {path_string}")
else:
# For all other packet types, try multiple methods to get the path
path_string = None
# Method 1: Try path_nodes field first
path_nodes = packet_info.get('path_nodes', [])
if path_nodes:
path_string = ','.join(path_nodes)
self.logger.info(f"🎯 EXTRACTED PATH FROM PACKET: {path_string} ({hops} hops)")
# Update mesh graph with path edges
if hasattr(self.bot, 'mesh_graph') and self.bot.mesh_graph and self.bot.mesh_graph.capture_enabled:
self._update_mesh_graph(path_nodes, packet_info)
else:
# Method 2: Try path_hex field
path_hex = packet_info.get('path_hex', '')
if path_hex and len(path_hex) >= 2:
path_nodes = self._path_hex_to_nodes(path_hex)
path_string = ','.join(path_nodes)
self.logger.info(f"🎯 EXTRACTED PATH FROM PACKET HEX: {path_string} ({hops} hops)")
# Update mesh graph with path edges
if hasattr(self.bot, 'mesh_graph') and self.bot.mesh_graph and self.bot.mesh_graph.capture_enabled:
self._update_mesh_graph(path_nodes, packet_info)
else:
# Method 3: Try path_info.path field
path_info = packet_info.get('path_info', {})
if path_info and path_info.get('path'):
path_nodes = path_info['path']
path_string = ','.join(path_nodes)
self.logger.info(f"🎯 EXTRACTED PATH FROM PATH_INFO: {path_string} ({hops} hops)")
# Update mesh graph with path edges
if hasattr(self.bot, 'mesh_graph') and self.bot.mesh_graph and self.bot.mesh_graph.capture_enabled:
self._update_mesh_graph(path_nodes, packet_info)
else:
# No path found - this is truly unknown
path_string = "Direct" if hops == 0 else "Unknown routing"
self.logger.info(f"🎯 EXTRACTED PATH FROM PACKET: {path_string} ({hops} hops)")
had_routing_nodes = bool((recent_rf_data.get('routing_info') or {}).get('path_nodes'))
path_string, path_nodes, hops = self._get_path_from_rf_data(
recent_rf_data, payload_hex=payload_hex, packet_info=packet_info
)
if path_string and path_nodes and hasattr(self.bot, 'mesh_graph') and self.bot.mesh_graph and self.bot.mesh_graph.capture_enabled:
self._update_mesh_graph(path_nodes, packet_info)
if path_string and not had_routing_nodes:
self.logger.debug(f"Path from fallback decode: {path_string} ({hops} hops)")
else:
# Packet decoding failed - try to extract path directly from raw hex
self.logger.debug("Packet decoding failed, trying direct hex parsing")
self.logger.debug("Packet decoding failed, trying direct hex or routing_info fallback")
path_string = self.extract_path_from_raw_hex(raw_hex, hops)
if path_string:
self.logger.info(f"🎯 EXTRACTED PATH FROM RAW HEX: {path_string} ({hops} hops)")
else:
# Try to use routing info from RF data as fallback
if recent_rf_data.get('routing_info') and recent_rf_data['routing_info'].get('path_nodes'):
routing_info = recent_rf_data['routing_info']
hops = len(routing_info['path_nodes'])
path_string = ','.join(routing_info['path_nodes'])
self.logger.info(f"🎯 EXTRACTED PATH FROM RF ROUTING INFO: {path_string} ({hops} hops)")
else:
# Final fallback to basic path info
self.logger.debug("No path info available, using basic path info")
path_string = None
if not path_string and recent_rf_data.get('routing_info') and recent_rf_data['routing_info'].get('path_nodes'):
routing_info = recent_rf_data['routing_info']
path_nodes = routing_info['path_nodes']
hops = len(path_nodes)
path_string = ','.join(str(n).lower() for n in path_nodes)
self.logger.debug(f"Path from RF routing_info fallback: {path_string} ({hops} hops)")
else:
self.logger.warning("❌ NO RF DATA found for channel message after all correlation attempts")
hops = payload.get('path_len', 255)
@@ -1760,6 +1785,8 @@ class MessageHandler:
elapsed=_elapsed,
is_dm=False
)
if recent_rf_data and recent_rf_data.get('routing_info'):
message.routing_info = recent_rf_data['routing_info']
# Path information is now set directly in the MeshMessage constructor from RF data
# No need for additional path extraction since we're using the actual routing data
+3 -1
View File
@@ -5,7 +5,7 @@ Contains shared data structures used across modules
"""
from dataclasses import dataclass
from typing import Optional
from typing import Any, Dict, Optional
@dataclass
@@ -22,3 +22,5 @@ class MeshMessage:
snr: Optional[float] = None
rssi: Optional[int] = None
elapsed: Optional[str] = None
# When set from RF routing: path_nodes, path_hex, bytes_per_hop, path_length, route_type, etc.
routing_info: Optional[Dict[str, Any]] = None
+1 -1
View File
@@ -24,7 +24,7 @@ dependencies = [
"maidenhead>=1.4.0",
"pytz>=2023.3",
"aiohttp>=3.8.0",
"meshcore>=2.1.6",
"meshcore>=2.2.14",
"openmeteo-requests>=1.7.2",
"requests-cache>=1.1.1",
"retry-requests>=1.0.0",
+1 -1
View File
@@ -18,7 +18,7 @@ pytz>=2023.3
aiohttp>=3.8.0
cryptography>=41.0.0
pynacl>=1.5.0
meshcore>=2.1.6
meshcore>=2.2.14
openmeteo-requests>=1.7.2
requests-cache>=1.1.1
retry-requests>=1.0.0
+146
View File
@@ -0,0 +1,146 @@
#!/usr/bin/env python3
"""
Unit tests for PathCommand multi-byte path support: routing_info usage and comma/prefix parsing.
"""
import pytest
from unittest.mock import Mock
from modules.commands.path_command import PathCommand
from modules.models import MeshMessage
@pytest.mark.unit
class TestPathCommandDecodePathMultibyte:
"""Test _decode_path parsing: comma-separated inference and parse_path_string fallback."""
@pytest.fixture
def path_command(self, mock_bot):
"""Create a PathCommand instance."""
return PathCommand(mock_bot)
@pytest.mark.asyncio
async def test_comma_separated_two_byte_infers_four_char_nodes(self, path_command, mock_bot):
"""path 0102,5f7e -> 2 nodes (0102, 5F7E) even when prefix_hex_chars=2 would give 4 nodes for continuous."""
mock_bot.prefix_hex_chars = 2
captured = []
async def capture_lookup(node_ids, lookup_func=None):
captured.append(node_ids)
return {nid: {'found': True, 'name': nid} for nid in node_ids}
path_command._lookup_repeater_names = capture_lookup
await path_command._decode_path("0102,5f7e")
assert len(captured) == 1
assert captured[0] == ['0102', '5F7E']
@pytest.mark.asyncio
async def test_comma_separated_one_byte_two_nodes(self, path_command, mock_bot):
"""path 01,5f -> 2 nodes (01, 5F)."""
mock_bot.prefix_hex_chars = 2
captured = []
async def capture_lookup(node_ids, lookup_func=None):
captured.append(node_ids)
return {nid: {'found': True, 'name': nid} for nid in node_ids}
path_command._lookup_repeater_names = capture_lookup
await path_command._decode_path("01,5f")
assert len(captured) == 1
assert captured[0] == ['01', '5F']
@pytest.mark.asyncio
async def test_continuous_hex_uses_prefix_hex_chars(self, path_command, mock_bot):
"""path 01025f7e: prefix_hex_chars=2 -> 4 nodes (1-byte); prefix_hex_chars=4 -> 2 nodes (2-byte)."""
captured = []
async def capture_lookup(node_ids, lookup_func=None):
captured.append(list(node_ids))
return {nid: {'found': True, 'name': nid} for nid in node_ids}
path_command._lookup_repeater_names = capture_lookup
mock_bot.prefix_hex_chars = 2 # 2 hex chars per node = 1 byte per hop
captured.clear()
await path_command._decode_path("01025f7e")
assert len(captured) == 1
assert captured[0] == ['01', '02', '5F', '7E']
mock_bot.prefix_hex_chars = 4 # 4 hex chars per node = 2 bytes per hop
captured.clear()
await path_command._decode_path("01025f7e")
assert len(captured) == 1
assert captured[0] == ['0102', '5F7E']
@pytest.mark.asyncio
async def test_strips_hop_count_suffix(self, path_command, mock_bot):
"""path 01,5f (2 hops) -> 2 nodes."""
mock_bot.prefix_hex_chars = 2
captured = []
async def capture_lookup(node_ids, lookup_func=None):
captured.append(node_ids)
return {nid: {'found': True, 'name': nid} for nid in node_ids}
path_command._lookup_repeater_names = capture_lookup
await path_command._decode_path("01,5f (2 hops)")
assert len(captured) == 1
assert captured[0] == ['01', '5F']
@pytest.mark.unit
class TestPathCommandExtractPathUsesRoutingInfo:
"""Test _extract_path_from_recent_messages prefers routing_info.path_nodes when present."""
@pytest.fixture
def path_command(self, mock_bot):
"""Create a PathCommand instance."""
return PathCommand(mock_bot)
@pytest.mark.asyncio
async def test_uses_routing_info_path_nodes_when_present(self, path_command, mock_bot):
"""When message has routing_info with path_nodes, use them directly (no _decode_path)."""
path_command._current_message = MeshMessage(
content="path",
path="0102,5f7e (2 hops via FLOOD)",
routing_info={
'path_length': 2,
'path_nodes': ['0102', '5f7e'],
'path_hex': '01025f7e',
'bytes_per_hop': 2,
'route_type': 'FLOOD',
},
)
captured = []
async def capture_lookup(node_ids, lookup_func=None):
captured.append(list(node_ids))
return {nid: {'found': True, 'name': nid} for nid in node_ids}
path_command._lookup_repeater_names = capture_lookup
decode_path_called = []
async def track_decode(path_input):
decode_path_called.append(path_input)
return "decode_path_result"
path_command._decode_path = track_decode
path_command.translate = lambda key, **kwargs: f"msg:{kwargs.get('node_id', kwargs.get('name', key))}"
result = await path_command._extract_path_from_recent_messages()
assert len(captured) == 1
assert captured[0] == ['0102', '5F7E']
assert len(decode_path_called) == 0, "Should not call _decode_path when routing_info.path_nodes present"
assert '0102' in result and '5F7E' in result
@pytest.mark.asyncio
async def test_direct_connection_when_routing_info_path_length_zero(self, path_command, mock_bot):
"""When routing_info.path_length is 0, return direct connection message."""
path_command._current_message = MeshMessage(
content="path",
path="Direct via FLOOD",
routing_info={'path_length': 0, 'path_nodes': [], 'route_type': 'FLOOD'},
)
path_command.translate = lambda key, **kwargs: "Direct connection" if "direct" in key.lower() else key
result = await path_command._extract_path_from_recent_messages()
assert "direct" in result.lower()