diff --git a/config.ini.example b/config.ini.example index e14dab9..8c1bc79 100644 --- a/config.ini.example +++ b/config.ini.example @@ -851,6 +851,15 @@ mesh_connections_retention_days = 7 # Enable or disable the path command enabled = true +# Require minimum path byte length before responding to this command +# 0 = allow all (default) +# 1 = allow all (compatibility alias for 0) +# 2 = only respond when path has 2 bytes or more (e.g. 0101...) +# 3 = only respond when path is exactly 3 bytes (e.g. 010101) +require_path_bytes_greater_or_equal_to = 0 +# Optional response when rejected by path-byte requirement; leave empty for silent reject +require_path_bytes_failure_response = + # Enable "p" shortcut for path command (similar to "t" for test command) # true: Respond to just "p" or "p " as a shortcut for "path" (default) # false: Only respond to "path", "decode", or "route" keywords @@ -1095,6 +1104,14 @@ enabled = false response_format = @[{sender}] found {path_count} unique path(s):\n{paths} # condense_paths: false = one full path per line; true = flat condensed tree (default); nested = indented ├/└ under ┐ sub-blocks (uses ASCII spaces, not U+2502, to save bytes vs old │). condense_paths = true +# Require minimum path byte length for triggering and collected packets +# 0 = allow all (default) +# 1 = allow all (compatibility alias for 0) +# 2 = only include paths with 2 bytes or more (e.g. 0101...) +# 3 = only include paths with exactly 3 bytes (e.g. 010101) +require_path_bytes_greater_or_equal_to = 0 +# Optional response when rejected by path-byte requirement; leave empty for silent reject +require_path_bytes_failure_response = [Greeter_Command] # Enable greeter to greet users on first channel message (true/false) @@ -1324,6 +1341,14 @@ enabled = true [Test_Command] enabled = true +# Require minimum path byte length before responding to test/t +# 0 = allow all (default) +# 1 = allow all (compatibility alias for 0) +# 2 = only respond when path has 2 bytes or more (e.g. 0101...) +# 3 = only respond when path is exactly 3 bytes (e.g. 010101) +require_path_bytes_greater_or_equal_to = 0 +# Optional response when rejected by path-byte requirement; leave empty for silent reject +require_path_bytes_failure_response = # channels = [Trace_Command] diff --git a/modules/commands/base_command.py b/modules/commands/base_command.py index a66aca7..1b49b75 100644 --- a/modules/commands/base_command.py +++ b/modules/commands/base_command.py @@ -919,6 +919,106 @@ class BaseCommand(ABC): """Check if this command can execute right now (permissions, cooldown, etc.)""" return self.can_execute(message) + def _get_required_path_bytes_setting(self, section: str) -> int: + """Return normalized path-byte requirement for a command section. + + Supported values: + - 0/1: allow all + - 2: require >= 2 bytes + - 3: require exactly 3 bytes + """ + required = self.get_config_value( + section, + 'require_path_bytes_greater_or_equal_to', + fallback=0, + value_type='int', + ) + if required in (0, 1, 2, 3): + return required + self.logger.warning( + f"Invalid {section}.require_path_bytes_greater_or_equal_to={required}; defaulting to 0" + ) + return 0 + + def _path_bytes_match_requirement(self, path_byte_length: int, required_path_bytes: int) -> bool: + """Check whether path byte length satisfies configured requirement.""" + if required_path_bytes in (0, 1): + return True + if required_path_bytes == 2: + return path_byte_length >= 2 + if required_path_bytes == 3: + return path_byte_length == 3 + return True + + def _get_message_path_byte_length(self, message: MeshMessage) -> int: + """Best-effort extraction of message path byte length from routing/path data.""" + routing_info = getattr(message, 'routing_info', None) + if routing_info: + raw_path_byte_length = routing_info.get('path_byte_length') + if isinstance(raw_path_byte_length, int) and raw_path_byte_length >= 0: + return raw_path_byte_length + + bytes_per_hop = routing_info.get('bytes_per_hop') + path_length = routing_info.get('path_length') + if ( + isinstance(bytes_per_hop, int) + and bytes_per_hop >= 0 + and isinstance(path_length, int) + and path_length >= 0 + ): + return bytes_per_hop * path_length + + path_nodes = routing_info.get('path_nodes') or [] + if path_nodes: + total = 0 + for node in path_nodes: + node_str = str(node).strip() + if not node_str: + continue + total += len(node_str) // 2 + return total + + path_string = (getattr(message, 'path', None) or '').strip() + if not path_string: + return 0 + if " via ROUTE_TYPE_" in path_string: + path_string = path_string.split(" via ROUTE_TYPE_")[0] + if "Direct" in path_string or "0 hops" in path_string: + return 0 + path_string = re.sub(r'\s*\([^)]*hops?[^)]*\)', '', path_string, flags=re.IGNORECASE).strip() + if not path_string: + return 0 + if ',' in path_string: + tokens = [t.strip() for t in path_string.split(',') if t.strip()] + if tokens: + return sum(len(t) // 2 for t in tokens) + return len(path_string) // 2 + + def _get_required_path_bytes_failure_response(self, section: str) -> Optional[str]: + """Get optional response text used when path-byte requirement fails.""" + failure_response = self.get_config_value( + section, + 'require_path_bytes_failure_response', + fallback='', + value_type='str', + ) + if not failure_response: + return None + cleaned = self._strip_quotes_from_config(failure_response).strip() + return cleaned or None + + async def enforce_path_byte_requirement(self, message: MeshMessage, section: str) -> bool: + """Apply path-byte requirement; optionally send configured failure response.""" + required = self._get_required_path_bytes_setting(section) + path_byte_length = self._get_message_path_byte_length(message) + if self._path_bytes_match_requirement(path_byte_length, required): + return True + + failure_response = self._get_required_path_bytes_failure_response(section) + if failure_response: + await self.send_response(message, failure_response) + return False + def get_path_display_string(self, message: MeshMessage) -> str: """Get path string for display (test/ack placeholders). Prefers message.routing_info for multi-byte and direct.""" routing_info = getattr(message, 'routing_info', None) diff --git a/modules/commands/multitest_command.py b/modules/commands/multitest_command.py index 17dcb5c..3bb4835 100644 --- a/modules/commands/multitest_command.py +++ b/modules/commands/multitest_command.py @@ -542,6 +542,7 @@ class MultitestSession: listening_duration: float collected_paths: set[str] initial_path: Optional[str] = None + required_path_bytes_mode: int = 0 class MultitestCommand(BaseCommand): @@ -741,6 +742,34 @@ class MultitestCommand(BaseCommand): return ','.join(n.lower() for n in node_ids) return None + def _get_routing_info_path_byte_length(self, routing_info: dict) -> int: + """Best-effort extraction of path byte length from routing_info.""" + if not routing_info: + return 0 + + raw_path_byte_length = routing_info.get('path_byte_length') + if isinstance(raw_path_byte_length, int) and raw_path_byte_length >= 0: + return raw_path_byte_length + + bytes_per_hop = routing_info.get('bytes_per_hop') + path_length = routing_info.get('path_length') + if ( + isinstance(bytes_per_hop, int) + and bytes_per_hop >= 0 + and isinstance(path_length, int) + and path_length >= 0 + ): + return bytes_per_hop * path_length + + path_nodes = routing_info.get('path_nodes') or [] + total = 0 + for node in path_nodes: + node_str = str(node).strip() + if not node_str: + continue + total += len(node_str) // 2 + return total + def get_rf_data_for_message(self, message: MeshMessage) -> Optional[dict]: """Get RF data for a message by looking it up in recent RF data""" try: @@ -831,6 +860,11 @@ class MultitestCommand(BaseCommand): # CRITICAL: Only collect paths if this message has the same hash as the target # This ensures we only track variations of the same original message if message_hash == session.target_packet_hash: + routing_info = rf_data.get('routing_info', {}) + path_byte_length = self._get_routing_info_path_byte_length(routing_info) + if not self._path_bytes_match_requirement(path_byte_length, session.required_path_bytes_mode): + continue + # Try to extract path from RF data first (more reliable) path = self.extract_path_from_rf_data(rf_data) @@ -843,7 +877,6 @@ class MultitestCommand(BaseCommand): self.logger.info(f"✓ Collected path for user {user_id}: {path} (hash: {message_hash[:8]}...)") else: # Log when we have a matching hash but can't extract path - routing_info = rf_data.get('routing_info', {}) path_length = routing_info.get('path_length', 0) if path_length == 0: self.logger.debug(f"Matched hash {message_hash[:8]}... but path is direct (0 hops) for user {user_id}") @@ -880,6 +913,11 @@ class MultitestCommand(BaseCommand): # CRITICAL: Only process if hash matches exactly and is not None/empty if packet_hash and packet_hash == session.target_packet_hash: + routing_info = rf_data.get('routing_info', {}) + path_byte_length = self._get_routing_info_path_byte_length(routing_info) + if not self._path_bytes_match_requirement(path_byte_length, session.required_path_bytes_mode): + continue + matching_count += 1 # Extract path from this RF data path = self.extract_path_from_rf_data(rf_data) @@ -902,6 +940,9 @@ class MultitestCommand(BaseCommand): async def execute(self, message: MeshMessage) -> bool: """Execute the multitest command""" user_id = message.sender_id or "unknown" + required_path_bytes_mode = self._get_required_path_bytes_setting('Multitest_Command') + if not await self.enforce_path_byte_requirement(message, 'Multitest_Command'): + return True # Use lock to prevent concurrent execution from interfering async with self._get_execution_lock(): @@ -985,7 +1026,8 @@ class MultitestCommand(BaseCommand): listening_start_time=time.time(), listening_duration=listening_duration, collected_paths=set(), - initial_path=initial_path + initial_path=initial_path, + required_path_bytes_mode=required_path_bytes_mode, ) # Add initial path if available diff --git a/modules/commands/path_command.py b/modules/commands/path_command.py index dc43942..f9a9d17 100644 --- a/modules/commands/path_command.py +++ b/modules/commands/path_command.py @@ -197,6 +197,9 @@ class PathCommand(BaseCommand): """Execute path decode command""" self.logger.info(f"Path command executed with content: {message.content}") + if not await self.enforce_path_byte_requirement(message, 'Path_Command'): + return True + # Store the current message for use in _extract_path_from_recent_messages self._current_message = message diff --git a/modules/commands/test_command.py b/modules/commands/test_command.py index 57b4874..fe833a1 100644 --- a/modules/commands/test_command.py +++ b/modules/commands/test_command.py @@ -710,6 +710,9 @@ class TestCommand(BaseCommand): Returns: bool: True if execution was successful. """ + if not await self.enforce_path_byte_requirement(message, 'Test_Command'): + return True + # Store the current message for use in location lookups self._current_message = message return await self.handle_keyword_match(message) diff --git a/modules/service_plugins/packet_capture_service.py b/modules/service_plugins/packet_capture_service.py index 3a38e58..60a302a 100644 --- a/modules/service_plugins/packet_capture_service.py +++ b/modules/service_plugins/packet_capture_service.py @@ -704,88 +704,15 @@ class PacketCaptureService(BaseServicePlugin): snr = str(payload.get('snr', 'Unknown')) rssi = str(payload.get('rssi', 'Unknown')) - # Get packet hash - check multiple sources in order of preference, then calculate if needed - # This matches the original script's approach: use hash from routing_info if available, otherwise calculate - packet_hash = '0000000000000000' + # Get packet hash from decoded packet_info — same clean bytes as the upload's "raw" field, + # so this is always the correct hash (matches what other observers compute). + packet_hash = packet_info.get('packet_hash', '0000000000000000') - # 1. Check if payload has packet_hash directly (from bot's processing) - if isinstance(payload, dict): - if 'packet_hash' in payload: - packet_hash = payload['packet_hash'] - # 2. Check if payload has routing_info with packet_hash - elif 'routing_info' in payload: - routing_info = payload.get('routing_info', {}) - if isinstance(routing_info, dict) and 'packet_hash' in routing_info: - packet_hash = routing_info['packet_hash'] - - # 3. Check metadata if available - if packet_hash == '0000000000000000' and metadata and isinstance(metadata, dict): - if 'packet_hash' in metadata: - packet_hash = metadata['packet_hash'] - elif 'routing_info' in metadata: - routing_info = metadata.get('routing_info', {}) - if isinstance(routing_info, dict) and 'packet_hash' in routing_info: - packet_hash = routing_info['packet_hash'] - - # 4. Try to get from bot's recent_rf_data cache (if message_handler has processed it) - # Note: The bot stores full raw_hex (with framing bytes) for packet_prefix, but we use stripped raw_hex - # So we need to use the full raw_hex from payload for prefix matching - # Optimized: Use indexed rf_data_by_pubkey for O(1) lookup instead of linear search - if packet_hash == '0000000000000000' and hasattr(self.bot, 'message_handler'): - try: - message_handler = self.bot.message_handler - - # Get full raw_hex from payload for prefix matching (bot uses full raw_hex for packet_prefix) - full_raw_hex = payload.get('raw_hex', '') - if full_raw_hex: - packet_prefix = full_raw_hex.replace('0x', '')[:32] if len(full_raw_hex.replace('0x', '')) >= 32 else full_raw_hex.replace('0x', '') - else: - # Fallback: use stripped raw_hex (might not match, but worth trying) - clean_raw_hex_for_lookup = raw_hex.replace('0x', '') - packet_prefix = clean_raw_hex_for_lookup[:32] if len(clean_raw_hex_for_lookup) >= 32 else clean_raw_hex_for_lookup - - # Use indexed lookup (O(1)) instead of linear search (O(n)) - if hasattr(message_handler, 'rf_data_by_pubkey') and packet_prefix: - rf_data_list = message_handler.rf_data_by_pubkey.get(packet_prefix, []) - # Check most recent entries first (last in list, since they're appended in order) - for rf_data in reversed(rf_data_list): - if 'packet_hash' in rf_data: - packet_hash = rf_data['packet_hash'] - break - elif 'routing_info' in rf_data: - routing_info = rf_data.get('routing_info', {}) - if isinstance(routing_info, dict) and 'packet_hash' in routing_info: - packet_hash = routing_info['packet_hash'] - break - - # Fallback to linear search only if indexed lookup not available (backward compatibility) - if packet_hash == '0000000000000000' and hasattr(message_handler, 'recent_rf_data'): - for rf_data in message_handler.recent_rf_data: - # Match by packet_prefix (bot uses full raw_hex for this) - if rf_data.get('packet_prefix') == packet_prefix: - if 'packet_hash' in rf_data: - packet_hash = rf_data['packet_hash'] - break - elif 'routing_info' in rf_data: - routing_info = rf_data.get('routing_info', {}) - if isinstance(routing_info, dict) and 'packet_hash' in routing_info: - packet_hash = routing_info['packet_hash'] - break - except Exception as e: - if self.debug: - self.logger.debug(f"Error checking recent_rf_data for hash: {e}") - - # 5. Fall back to hash from decoded packet_info (should be calculated correctly) - if packet_hash == '0000000000000000': - packet_hash = packet_info.get('packet_hash', '0000000000000000') - - # 6. If still no hash, calculate it from raw_hex (matches original script's format_packet_data) + # Only fall back to direct calculation if decode_packet didn't produce a hash if packet_hash == '0000000000000000': try: - # Use payload_type_value from packet_info if available, otherwise None (will be extracted from header) payload_type_value = packet_info.get('payload_type_value') if payload_type_value is not None: - # Ensure it's an integer (handle enum.value if passed) if hasattr(payload_type_value, 'value'): payload_type_value = payload_type_value.value payload_type_value = int(payload_type_value) & 0x0F diff --git a/modules/utils.py b/modules/utils.py index b6e53c4..d0dcb52 100644 --- a/modules/utils.py +++ b/modules/utils.py @@ -529,9 +529,8 @@ def calculate_packet_hash(raw_hex: str, payload_type: Optional[int] = None) -> s if payload_type == 9: # PAYLOAD_TYPE_TRACE # C++ does: sha.update(&path_len, sizeof(path_len)) - # path_len is uint16_t, so sizeof(path_len) = 2 bytes - # Convert path_len to 2-byte little-endian uint16_t - hash_obj.update(path_byte_length.to_bytes(2, byteorder='little')) + # path_len is the raw wire byte (uint16_t in firmware), not the decoded byte count + hash_obj.update(path_len_byte.to_bytes(2, byteorder='little')) hash_obj.update(payload_data) diff --git a/tests/unit/test_command_path_byte_gating.py b/tests/unit/test_command_path_byte_gating.py new file mode 100644 index 0000000..ff3e0ec --- /dev/null +++ b/tests/unit/test_command_path_byte_gating.py @@ -0,0 +1,139 @@ +#!/usr/bin/env python3 +"""Unit tests for path-byte gating across test/multitest/path commands.""" + +import configparser +from unittest.mock import AsyncMock, MagicMock, Mock + +import pytest + +from modules.commands.multitest_command import MultitestCommand, MultitestSession +from modules.commands.path_command import PathCommand +from modules.commands.test_command import TestCommand as MeshTestCommand +from tests.conftest import mock_message + + +def _base_bot() -> MagicMock: + bot = MagicMock() + bot.logger = Mock() + bot.config = configparser.ConfigParser() + bot.config.add_section("Bot") + bot.config.set("Bot", "bot_name", "TestBot") + bot.config.add_section("Channels") + bot.config.set("Channels", "monitor_channels", "general") + bot.config.set("Channels", "respond_to_dms", "true") + bot.config.add_section("Keywords") + bot.config.set("Keywords", "test", "ack") + bot.command_manager = MagicMock() + bot.command_manager.monitor_channels = ["general"] + bot.command_manager.send_response = AsyncMock(return_value=True) + bot.translator = MagicMock() + bot.translator.translate = Mock(side_effect=lambda key, **kwargs: key) + bot.prefix_hex_chars = 2 + return bot + + +@pytest.mark.asyncio +async def test_test_command_rejects_silently_when_path_bytes_too_short(): + bot = _base_bot() + bot.config.add_section("Test_Command") + bot.config.set("Test_Command", "enabled", "true") + bot.config.set("Test_Command", "require_path_bytes_greater_or_equal_to", "2") + bot.config.add_section("Path_Command") + bot.config.set("Path_Command", "recency_weight", "0.2") + + cmd = MeshTestCommand(bot) + cmd.handle_keyword_match = AsyncMock(return_value=True) + message = mock_message(content="test", channel="general", routing_info={"path_byte_length": 1}) + + result = await cmd.execute(message) + + assert result is True + cmd.handle_keyword_match.assert_not_called() + bot.command_manager.send_response.assert_not_called() + + +@pytest.mark.asyncio +async def test_test_command_sends_failure_response_when_configured(): + bot = _base_bot() + bot.config.add_section("Test_Command") + bot.config.set("Test_Command", "enabled", "true") + bot.config.set("Test_Command", "require_path_bytes_greater_or_equal_to", "3") + bot.config.set("Test_Command", "require_path_bytes_failure_response", "Need 3-byte path") + bot.config.add_section("Path_Command") + bot.config.set("Path_Command", "recency_weight", "0.2") + + cmd = MeshTestCommand(bot) + cmd.handle_keyword_match = AsyncMock(return_value=True) + message = mock_message(content="test", channel="general", routing_info={"path_byte_length": 2}) + + result = await cmd.execute(message) + + assert result is True + cmd.handle_keyword_match.assert_not_called() + bot.command_manager.send_response.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_path_command_mode_one_behaves_like_zero(): + bot = _base_bot() + bot.config.add_section("Path_Command") + bot.config.set("Path_Command", "enabled", "true") + bot.config.set("Path_Command", "require_path_bytes_greater_or_equal_to", "1") + + cmd = PathCommand(bot) + cmd._send_path_response = AsyncMock(return_value=True) + cmd._extract_path_from_recent_messages = AsyncMock(return_value="decoded") + message = mock_message(content="path", channel="general", routing_info={"path_byte_length": 0}) + + result = await cmd.execute(message) + + assert result is True + cmd._extract_path_from_recent_messages.assert_awaited_once() + cmd._send_path_response.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_multitest_execute_rejects_with_custom_failure_message(): + bot = _base_bot() + bot.config.add_section("Multitest_Command") + bot.config.set("Multitest_Command", "enabled", "true") + bot.config.set("Multitest_Command", "require_path_bytes_greater_or_equal_to", "2") + bot.config.set("Multitest_Command", "require_path_bytes_failure_response", "Need >=2 path bytes") + + cmd = MultitestCommand(bot) + message = mock_message(content="multitest", channel="general", routing_info={"path_byte_length": 1}) + + result = await cmd.execute(message) + + assert result is True + bot.command_manager.send_response.assert_awaited_once() + + +def test_multitest_listener_filters_matching_hash_by_required_path_bytes(): + bot = _base_bot() + bot.config.add_section("Multitest_Command") + bot.config.set("Multitest_Command", "enabled", "true") + cmd = MultitestCommand(bot) + cmd.extract_path_from_rf_data = Mock(return_value="0101") + cmd.get_rf_data_for_message = Mock( + return_value={ + "packet_hash": "abc123", + "routing_info": {"path_byte_length": 2, "path_nodes": ["0101"]}, + } + ) + + session = MultitestSession( + user_id="alice", + target_packet_hash="abc123", + triggering_timestamp=0.0, + listening_start_time=0.0, + listening_duration=9999.0, + collected_paths=set(), + required_path_bytes_mode=3, + ) + cmd._active_sessions["alice"] = session + message = mock_message(content="x", sender_id="alice") + + cmd.on_message_received(message) + + assert session.collected_paths == set() diff --git a/tests/unit/test_packet_capture_trace_decode.py b/tests/unit/test_packet_capture_trace_decode.py index 8db1a04..f94fc31 100644 --- a/tests/unit/test_packet_capture_trace_decode.py +++ b/tests/unit/test_packet_capture_trace_decode.py @@ -1,8 +1,40 @@ """Packet capture TRACE decode: path must come from payload, not RF SNR bytes.""" +import hashlib import logging from modules.service_plugins.packet_capture_service import PacketCaptureService +from modules.utils import calculate_packet_hash + + +def test_calculate_packet_hash_trace_uses_wire_byte(): + """TRACE hash must use the raw path_len wire byte, not the decoded byte count. + + When size_code > 0 (multi-byte-per-hop paths), path_len_byte != path_byte_length. + The firmware hashes path_len_byte (the raw wire value), so we must too. + """ + # header 0x26: route_type=DIRECT(2), payload_type=TRACE(9), version=VER_1(0) + # path_len_byte 0x42: size_code=1 → 2 bytes/hop, hop_count=2 → path_byte_length=4 + # path: AABBCCDD (4 bytes), payload: 01020304 + raw_hex = "2642AABBCCDD01020304" + + # Correct: use path_len_byte=0x42 (66), matching firmware Packet::calculatePacketHash() + expected = hashlib.sha256( + bytes([0x09]) + + (0x42).to_bytes(2, "little") + + bytes.fromhex("01020304") + ).hexdigest()[:16].upper() + + result = calculate_packet_hash(raw_hex, 9) + assert result == expected, f"Expected {expected}, got {result}" + + # Confirm this differs from the old buggy value (path_byte_length=4 instead of wire byte 0x42) + buggy = hashlib.sha256( + bytes([0x09]) + + (4).to_bytes(2, "little") + + bytes.fromhex("01020304") + ).hexdigest()[:16].upper() + assert result != buggy, "Hash must not match old path_byte_length computation" def test_decode_packet_trace_uses_payload_route_hashes():