feat(commands, config): implement path byte length requirements for commands

- Added configuration options to specify minimum path byte length requirements for the path, test, and multitest commands.
- Introduced methods to enforce these requirements and handle failure responses.
- Updated the packet capture service to correctly calculate packet hashes using the raw wire byte length.
- Enhanced unit tests to verify the correct behavior of the new path byte length logic.
This commit is contained in:
agessaman
2026-04-25 09:05:17 -07:00
parent 5f40c45f07
commit 63592bdd65
9 changed files with 352 additions and 82 deletions
+25
View File
@@ -851,6 +851,15 @@ mesh_connections_retention_days = 7
# Enable or disable the path command
enabled = true
# Require minimum path byte length before responding to this command
# 0 = allow all (default)
# 1 = allow all (compatibility alias for 0)
# 2 = only respond when path has 2 bytes or more (e.g. 0101...)
# 3 = only respond when path is exactly 3 bytes (e.g. 010101)
require_path_bytes_greater_or_equal_to = 0
# Optional response when rejected by path-byte requirement; leave empty for silent reject
require_path_bytes_failure_response =
# Enable "p" shortcut for path command (similar to "t" for test command)
# true: Respond to just "p" or "p <path_data>" as a shortcut for "path" (default)
# false: Only respond to "path", "decode", or "route" keywords
@@ -1095,6 +1104,14 @@ enabled = false
response_format = @[{sender}] found {path_count} unique path(s):\n{paths}
# condense_paths: false = one full path per line; true = flat condensed tree (default); nested = indented ├/└ under ┐ sub-blocks (uses ASCII spaces, not U+2502, to save bytes vs old │).
condense_paths = true
# Require minimum path byte length for triggering and collected packets
# 0 = allow all (default)
# 1 = allow all (compatibility alias for 0)
# 2 = only include paths with 2 bytes or more (e.g. 0101...)
# 3 = only include paths with exactly 3 bytes (e.g. 010101)
require_path_bytes_greater_or_equal_to = 0
# Optional response when rejected by path-byte requirement; leave empty for silent reject
require_path_bytes_failure_response =
[Greeter_Command]
# Enable greeter to greet users on first channel message (true/false)
@@ -1324,6 +1341,14 @@ enabled = true
[Test_Command]
enabled = true
# Require minimum path byte length before responding to test/t
# 0 = allow all (default)
# 1 = allow all (compatibility alias for 0)
# 2 = only respond when path has 2 bytes or more (e.g. 0101...)
# 3 = only respond when path is exactly 3 bytes (e.g. 010101)
require_path_bytes_greater_or_equal_to = 0
# Optional response when rejected by path-byte requirement; leave empty for silent reject
require_path_bytes_failure_response =
# channels =
[Trace_Command]
+100
View File
@@ -919,6 +919,106 @@ class BaseCommand(ABC):
"""Check if this command can execute right now (permissions, cooldown, etc.)"""
return self.can_execute(message)
def _get_required_path_bytes_setting(self, section: str) -> int:
"""Return normalized path-byte requirement for a command section.
Supported values:
- 0/1: allow all
- 2: require >= 2 bytes
- 3: require exactly 3 bytes
"""
required = self.get_config_value(
section,
'require_path_bytes_greater_or_equal_to',
fallback=0,
value_type='int',
)
if required in (0, 1, 2, 3):
return required
self.logger.warning(
f"Invalid {section}.require_path_bytes_greater_or_equal_to={required}; defaulting to 0"
)
return 0
def _path_bytes_match_requirement(self, path_byte_length: int, required_path_bytes: int) -> bool:
"""Check whether path byte length satisfies configured requirement."""
if required_path_bytes in (0, 1):
return True
if required_path_bytes == 2:
return path_byte_length >= 2
if required_path_bytes == 3:
return path_byte_length == 3
return True
def _get_message_path_byte_length(self, message: MeshMessage) -> int:
"""Best-effort extraction of message path byte length from routing/path data."""
routing_info = getattr(message, 'routing_info', None)
if routing_info:
raw_path_byte_length = routing_info.get('path_byte_length')
if isinstance(raw_path_byte_length, int) and raw_path_byte_length >= 0:
return raw_path_byte_length
bytes_per_hop = routing_info.get('bytes_per_hop')
path_length = routing_info.get('path_length')
if (
isinstance(bytes_per_hop, int)
and bytes_per_hop >= 0
and isinstance(path_length, int)
and path_length >= 0
):
return bytes_per_hop * path_length
path_nodes = routing_info.get('path_nodes') or []
if path_nodes:
total = 0
for node in path_nodes:
node_str = str(node).strip()
if not node_str:
continue
total += len(node_str) // 2
return total
path_string = (getattr(message, 'path', None) or '').strip()
if not path_string:
return 0
if " via ROUTE_TYPE_" in path_string:
path_string = path_string.split(" via ROUTE_TYPE_")[0]
if "Direct" in path_string or "0 hops" in path_string:
return 0
path_string = re.sub(r'\s*\([^)]*hops?[^)]*\)', '', path_string, flags=re.IGNORECASE).strip()
if not path_string:
return 0
if ',' in path_string:
tokens = [t.strip() for t in path_string.split(',') if t.strip()]
if tokens:
return sum(len(t) // 2 for t in tokens)
return len(path_string) // 2
def _get_required_path_bytes_failure_response(self, section: str) -> Optional[str]:
"""Get optional response text used when path-byte requirement fails."""
failure_response = self.get_config_value(
section,
'require_path_bytes_failure_response',
fallback='',
value_type='str',
)
if not failure_response:
return None
cleaned = self._strip_quotes_from_config(failure_response).strip()
return cleaned or None
async def enforce_path_byte_requirement(self, message: MeshMessage, section: str) -> bool:
"""Apply path-byte requirement; optionally send configured failure response."""
required = self._get_required_path_bytes_setting(section)
path_byte_length = self._get_message_path_byte_length(message)
if self._path_bytes_match_requirement(path_byte_length, required):
return True
failure_response = self._get_required_path_bytes_failure_response(section)
if failure_response:
await self.send_response(message, failure_response)
return False
def get_path_display_string(self, message: MeshMessage) -> str:
"""Get path string for display (test/ack placeholders). Prefers message.routing_info for multi-byte and direct."""
routing_info = getattr(message, 'routing_info', None)
+44 -2
View File
@@ -542,6 +542,7 @@ class MultitestSession:
listening_duration: float
collected_paths: set[str]
initial_path: Optional[str] = None
required_path_bytes_mode: int = 0
class MultitestCommand(BaseCommand):
@@ -741,6 +742,34 @@ class MultitestCommand(BaseCommand):
return ','.join(n.lower() for n in node_ids)
return None
def _get_routing_info_path_byte_length(self, routing_info: dict) -> int:
"""Best-effort extraction of path byte length from routing_info."""
if not routing_info:
return 0
raw_path_byte_length = routing_info.get('path_byte_length')
if isinstance(raw_path_byte_length, int) and raw_path_byte_length >= 0:
return raw_path_byte_length
bytes_per_hop = routing_info.get('bytes_per_hop')
path_length = routing_info.get('path_length')
if (
isinstance(bytes_per_hop, int)
and bytes_per_hop >= 0
and isinstance(path_length, int)
and path_length >= 0
):
return bytes_per_hop * path_length
path_nodes = routing_info.get('path_nodes') or []
total = 0
for node in path_nodes:
node_str = str(node).strip()
if not node_str:
continue
total += len(node_str) // 2
return total
def get_rf_data_for_message(self, message: MeshMessage) -> Optional[dict]:
"""Get RF data for a message by looking it up in recent RF data"""
try:
@@ -831,6 +860,11 @@ class MultitestCommand(BaseCommand):
# CRITICAL: Only collect paths if this message has the same hash as the target
# This ensures we only track variations of the same original message
if message_hash == session.target_packet_hash:
routing_info = rf_data.get('routing_info', {})
path_byte_length = self._get_routing_info_path_byte_length(routing_info)
if not self._path_bytes_match_requirement(path_byte_length, session.required_path_bytes_mode):
continue
# Try to extract path from RF data first (more reliable)
path = self.extract_path_from_rf_data(rf_data)
@@ -843,7 +877,6 @@ class MultitestCommand(BaseCommand):
self.logger.info(f"✓ Collected path for user {user_id}: {path} (hash: {message_hash[:8]}...)")
else:
# Log when we have a matching hash but can't extract path
routing_info = rf_data.get('routing_info', {})
path_length = routing_info.get('path_length', 0)
if path_length == 0:
self.logger.debug(f"Matched hash {message_hash[:8]}... but path is direct (0 hops) for user {user_id}")
@@ -880,6 +913,11 @@ class MultitestCommand(BaseCommand):
# CRITICAL: Only process if hash matches exactly and is not None/empty
if packet_hash and packet_hash == session.target_packet_hash:
routing_info = rf_data.get('routing_info', {})
path_byte_length = self._get_routing_info_path_byte_length(routing_info)
if not self._path_bytes_match_requirement(path_byte_length, session.required_path_bytes_mode):
continue
matching_count += 1
# Extract path from this RF data
path = self.extract_path_from_rf_data(rf_data)
@@ -902,6 +940,9 @@ class MultitestCommand(BaseCommand):
async def execute(self, message: MeshMessage) -> bool:
"""Execute the multitest command"""
user_id = message.sender_id or "unknown"
required_path_bytes_mode = self._get_required_path_bytes_setting('Multitest_Command')
if not await self.enforce_path_byte_requirement(message, 'Multitest_Command'):
return True
# Use lock to prevent concurrent execution from interfering
async with self._get_execution_lock():
@@ -985,7 +1026,8 @@ class MultitestCommand(BaseCommand):
listening_start_time=time.time(),
listening_duration=listening_duration,
collected_paths=set(),
initial_path=initial_path
initial_path=initial_path,
required_path_bytes_mode=required_path_bytes_mode,
)
# Add initial path if available
+3
View File
@@ -197,6 +197,9 @@ class PathCommand(BaseCommand):
"""Execute path decode command"""
self.logger.info(f"Path command executed with content: {message.content}")
if not await self.enforce_path_byte_requirement(message, 'Path_Command'):
return True
# Store the current message for use in _extract_path_from_recent_messages
self._current_message = message
+3
View File
@@ -710,6 +710,9 @@ class TestCommand(BaseCommand):
Returns:
bool: True if execution was successful.
"""
if not await self.enforce_path_byte_requirement(message, 'Test_Command'):
return True
# Store the current message for use in location lookups
self._current_message = message
return await self.handle_keyword_match(message)
@@ -704,88 +704,15 @@ class PacketCaptureService(BaseServicePlugin):
snr = str(payload.get('snr', 'Unknown'))
rssi = str(payload.get('rssi', 'Unknown'))
# Get packet hash - check multiple sources in order of preference, then calculate if needed
# This matches the original script's approach: use hash from routing_info if available, otherwise calculate
packet_hash = '0000000000000000'
# Get packet hash from decoded packet_info — same clean bytes as the upload's "raw" field,
# so this is always the correct hash (matches what other observers compute).
packet_hash = packet_info.get('packet_hash', '0000000000000000')
# 1. Check if payload has packet_hash directly (from bot's processing)
if isinstance(payload, dict):
if 'packet_hash' in payload:
packet_hash = payload['packet_hash']
# 2. Check if payload has routing_info with packet_hash
elif 'routing_info' in payload:
routing_info = payload.get('routing_info', {})
if isinstance(routing_info, dict) and 'packet_hash' in routing_info:
packet_hash = routing_info['packet_hash']
# 3. Check metadata if available
if packet_hash == '0000000000000000' and metadata and isinstance(metadata, dict):
if 'packet_hash' in metadata:
packet_hash = metadata['packet_hash']
elif 'routing_info' in metadata:
routing_info = metadata.get('routing_info', {})
if isinstance(routing_info, dict) and 'packet_hash' in routing_info:
packet_hash = routing_info['packet_hash']
# 4. Try to get from bot's recent_rf_data cache (if message_handler has processed it)
# Note: The bot stores full raw_hex (with framing bytes) for packet_prefix, but we use stripped raw_hex
# So we need to use the full raw_hex from payload for prefix matching
# Optimized: Use indexed rf_data_by_pubkey for O(1) lookup instead of linear search
if packet_hash == '0000000000000000' and hasattr(self.bot, 'message_handler'):
try:
message_handler = self.bot.message_handler
# Get full raw_hex from payload for prefix matching (bot uses full raw_hex for packet_prefix)
full_raw_hex = payload.get('raw_hex', '')
if full_raw_hex:
packet_prefix = full_raw_hex.replace('0x', '')[:32] if len(full_raw_hex.replace('0x', '')) >= 32 else full_raw_hex.replace('0x', '')
else:
# Fallback: use stripped raw_hex (might not match, but worth trying)
clean_raw_hex_for_lookup = raw_hex.replace('0x', '')
packet_prefix = clean_raw_hex_for_lookup[:32] if len(clean_raw_hex_for_lookup) >= 32 else clean_raw_hex_for_lookup
# Use indexed lookup (O(1)) instead of linear search (O(n))
if hasattr(message_handler, 'rf_data_by_pubkey') and packet_prefix:
rf_data_list = message_handler.rf_data_by_pubkey.get(packet_prefix, [])
# Check most recent entries first (last in list, since they're appended in order)
for rf_data in reversed(rf_data_list):
if 'packet_hash' in rf_data:
packet_hash = rf_data['packet_hash']
break
elif 'routing_info' in rf_data:
routing_info = rf_data.get('routing_info', {})
if isinstance(routing_info, dict) and 'packet_hash' in routing_info:
packet_hash = routing_info['packet_hash']
break
# Fallback to linear search only if indexed lookup not available (backward compatibility)
if packet_hash == '0000000000000000' and hasattr(message_handler, 'recent_rf_data'):
for rf_data in message_handler.recent_rf_data:
# Match by packet_prefix (bot uses full raw_hex for this)
if rf_data.get('packet_prefix') == packet_prefix:
if 'packet_hash' in rf_data:
packet_hash = rf_data['packet_hash']
break
elif 'routing_info' in rf_data:
routing_info = rf_data.get('routing_info', {})
if isinstance(routing_info, dict) and 'packet_hash' in routing_info:
packet_hash = routing_info['packet_hash']
break
except Exception as e:
if self.debug:
self.logger.debug(f"Error checking recent_rf_data for hash: {e}")
# 5. Fall back to hash from decoded packet_info (should be calculated correctly)
if packet_hash == '0000000000000000':
packet_hash = packet_info.get('packet_hash', '0000000000000000')
# 6. If still no hash, calculate it from raw_hex (matches original script's format_packet_data)
# Only fall back to direct calculation if decode_packet didn't produce a hash
if packet_hash == '0000000000000000':
try:
# Use payload_type_value from packet_info if available, otherwise None (will be extracted from header)
payload_type_value = packet_info.get('payload_type_value')
if payload_type_value is not None:
# Ensure it's an integer (handle enum.value if passed)
if hasattr(payload_type_value, 'value'):
payload_type_value = payload_type_value.value
payload_type_value = int(payload_type_value) & 0x0F
+2 -3
View File
@@ -529,9 +529,8 @@ def calculate_packet_hash(raw_hex: str, payload_type: Optional[int] = None) -> s
if payload_type == 9: # PAYLOAD_TYPE_TRACE
# C++ does: sha.update(&path_len, sizeof(path_len))
# path_len is uint16_t, so sizeof(path_len) = 2 bytes
# Convert path_len to 2-byte little-endian uint16_t
hash_obj.update(path_byte_length.to_bytes(2, byteorder='little'))
# path_len is the raw wire byte (uint16_t in firmware), not the decoded byte count
hash_obj.update(path_len_byte.to_bytes(2, byteorder='little'))
hash_obj.update(payload_data)
+139
View File
@@ -0,0 +1,139 @@
#!/usr/bin/env python3
"""Unit tests for path-byte gating across test/multitest/path commands."""
import configparser
from unittest.mock import AsyncMock, MagicMock, Mock
import pytest
from modules.commands.multitest_command import MultitestCommand, MultitestSession
from modules.commands.path_command import PathCommand
from modules.commands.test_command import TestCommand as MeshTestCommand
from tests.conftest import mock_message
def _base_bot() -> MagicMock:
bot = MagicMock()
bot.logger = Mock()
bot.config = configparser.ConfigParser()
bot.config.add_section("Bot")
bot.config.set("Bot", "bot_name", "TestBot")
bot.config.add_section("Channels")
bot.config.set("Channels", "monitor_channels", "general")
bot.config.set("Channels", "respond_to_dms", "true")
bot.config.add_section("Keywords")
bot.config.set("Keywords", "test", "ack")
bot.command_manager = MagicMock()
bot.command_manager.monitor_channels = ["general"]
bot.command_manager.send_response = AsyncMock(return_value=True)
bot.translator = MagicMock()
bot.translator.translate = Mock(side_effect=lambda key, **kwargs: key)
bot.prefix_hex_chars = 2
return bot
@pytest.mark.asyncio
async def test_test_command_rejects_silently_when_path_bytes_too_short():
bot = _base_bot()
bot.config.add_section("Test_Command")
bot.config.set("Test_Command", "enabled", "true")
bot.config.set("Test_Command", "require_path_bytes_greater_or_equal_to", "2")
bot.config.add_section("Path_Command")
bot.config.set("Path_Command", "recency_weight", "0.2")
cmd = MeshTestCommand(bot)
cmd.handle_keyword_match = AsyncMock(return_value=True)
message = mock_message(content="test", channel="general", routing_info={"path_byte_length": 1})
result = await cmd.execute(message)
assert result is True
cmd.handle_keyword_match.assert_not_called()
bot.command_manager.send_response.assert_not_called()
@pytest.mark.asyncio
async def test_test_command_sends_failure_response_when_configured():
bot = _base_bot()
bot.config.add_section("Test_Command")
bot.config.set("Test_Command", "enabled", "true")
bot.config.set("Test_Command", "require_path_bytes_greater_or_equal_to", "3")
bot.config.set("Test_Command", "require_path_bytes_failure_response", "Need 3-byte path")
bot.config.add_section("Path_Command")
bot.config.set("Path_Command", "recency_weight", "0.2")
cmd = MeshTestCommand(bot)
cmd.handle_keyword_match = AsyncMock(return_value=True)
message = mock_message(content="test", channel="general", routing_info={"path_byte_length": 2})
result = await cmd.execute(message)
assert result is True
cmd.handle_keyword_match.assert_not_called()
bot.command_manager.send_response.assert_awaited_once()
@pytest.mark.asyncio
async def test_path_command_mode_one_behaves_like_zero():
bot = _base_bot()
bot.config.add_section("Path_Command")
bot.config.set("Path_Command", "enabled", "true")
bot.config.set("Path_Command", "require_path_bytes_greater_or_equal_to", "1")
cmd = PathCommand(bot)
cmd._send_path_response = AsyncMock(return_value=True)
cmd._extract_path_from_recent_messages = AsyncMock(return_value="decoded")
message = mock_message(content="path", channel="general", routing_info={"path_byte_length": 0})
result = await cmd.execute(message)
assert result is True
cmd._extract_path_from_recent_messages.assert_awaited_once()
cmd._send_path_response.assert_awaited_once()
@pytest.mark.asyncio
async def test_multitest_execute_rejects_with_custom_failure_message():
bot = _base_bot()
bot.config.add_section("Multitest_Command")
bot.config.set("Multitest_Command", "enabled", "true")
bot.config.set("Multitest_Command", "require_path_bytes_greater_or_equal_to", "2")
bot.config.set("Multitest_Command", "require_path_bytes_failure_response", "Need >=2 path bytes")
cmd = MultitestCommand(bot)
message = mock_message(content="multitest", channel="general", routing_info={"path_byte_length": 1})
result = await cmd.execute(message)
assert result is True
bot.command_manager.send_response.assert_awaited_once()
def test_multitest_listener_filters_matching_hash_by_required_path_bytes():
bot = _base_bot()
bot.config.add_section("Multitest_Command")
bot.config.set("Multitest_Command", "enabled", "true")
cmd = MultitestCommand(bot)
cmd.extract_path_from_rf_data = Mock(return_value="0101")
cmd.get_rf_data_for_message = Mock(
return_value={
"packet_hash": "abc123",
"routing_info": {"path_byte_length": 2, "path_nodes": ["0101"]},
}
)
session = MultitestSession(
user_id="alice",
target_packet_hash="abc123",
triggering_timestamp=0.0,
listening_start_time=0.0,
listening_duration=9999.0,
collected_paths=set(),
required_path_bytes_mode=3,
)
cmd._active_sessions["alice"] = session
message = mock_message(content="x", sender_id="alice")
cmd.on_message_received(message)
assert session.collected_paths == set()
@@ -1,8 +1,40 @@
"""Packet capture TRACE decode: path must come from payload, not RF SNR bytes."""
import hashlib
import logging
from modules.service_plugins.packet_capture_service import PacketCaptureService
from modules.utils import calculate_packet_hash
def test_calculate_packet_hash_trace_uses_wire_byte():
"""TRACE hash must use the raw path_len wire byte, not the decoded byte count.
When size_code > 0 (multi-byte-per-hop paths), path_len_byte != path_byte_length.
The firmware hashes path_len_byte (the raw wire value), so we must too.
"""
# header 0x26: route_type=DIRECT(2), payload_type=TRACE(9), version=VER_1(0)
# path_len_byte 0x42: size_code=1 → 2 bytes/hop, hop_count=2 → path_byte_length=4
# path: AABBCCDD (4 bytes), payload: 01020304
raw_hex = "2642AABBCCDD01020304"
# Correct: use path_len_byte=0x42 (66), matching firmware Packet::calculatePacketHash()
expected = hashlib.sha256(
bytes([0x09])
+ (0x42).to_bytes(2, "little")
+ bytes.fromhex("01020304")
).hexdigest()[:16].upper()
result = calculate_packet_hash(raw_hex, 9)
assert result == expected, f"Expected {expected}, got {result}"
# Confirm this differs from the old buggy value (path_byte_length=4 instead of wire byte 0x42)
buggy = hashlib.sha256(
bytes([0x09])
+ (4).to_bytes(2, "little")
+ bytes.fromhex("01020304")
).hexdigest()[:16].upper()
assert result != buggy, "Hash must not match old path_byte_length computation"
def test_decode_packet_trace_uses_payload_route_hashes():