diff --git a/config.ini.example b/config.ini.example index b9d3228..a0bfb46 100644 --- a/config.ini.example +++ b/config.ini.example @@ -1493,6 +1493,14 @@ raw_duplicate_window = 2.0 # Keep RX_LOG SNR/RSSI keyed by packet prefix this long for RAW_DATA correlation (seconds). rf_data_cache_timeout = 15.0 +# If true, do not publish packet JSON to MQTT when content hash is 0000000000000000 +# (strict path_len reject, too-short buffer, or other unparseable cases). File output is unchanged. +mqtt_skip_unparseable_packets = true + +# If true, do not publish ADVERT packets to MQTT when mesh payload fails Ed25519 verification +# (MeshCore createAdvert signature over pubkey || timestamp || app_data). File output is unchanged. +advert_require_valid_signature = false + # Owner information (for packet analyzer registration) # Owner public key (64-character hex string) owner_public_key = diff --git a/docs/packet-capture.md b/docs/packet-capture.md index be1f91f..a88d17f 100644 --- a/docs/packet-capture.md +++ b/docs/packet-capture.md @@ -45,6 +45,11 @@ enabled = true # Enable packet capture output_file = packets.json # Optional: save to file verbose = false # Detailed packet logging debug = false # Debug mode +mqtt_skip_unparseable_packets = true # Skip MQTT when content hash is all zeros (strict path reject / short buffer) + +# Optional: skip MQTT for ADVERT packets whose Ed25519 signature does not verify (damaged or spoofed mesh payload). +# Does not affect file/JSONL capture. +advert_require_valid_signature = false ``` ### Authentication diff --git a/modules/message_handler.py b/modules/message_handler.py index 880ea98..72021ba 100644 --- a/modules/message_handler.py +++ b/modules/message_handler.py @@ -1350,7 +1350,13 @@ class MessageHandler: path_len_byte = byte_data[offset] offset += 1 # Decode per firmware: low 6 bits = hop count, high 2 bits = size code (bytes_per_hop = code+1) - path_byte_length, bytes_per_hop = decode_path_len_byte(path_len_byte) + path_parts = decode_path_len_byte(path_len_byte) + if path_parts is None: + self.logger.debug( + "decode_meshcore_packet: invalid path_len byte (firmware would reject)" + ) + return None + path_byte_length, bytes_per_hop = path_parts # Check if we have enough data for the full path if len(byte_data) < offset + path_byte_length: @@ -1375,7 +1381,7 @@ class MessageHandler: # Extract payload type (bits 2-5) payload_type = PayloadType((header >> 2) & 0x0F) - # Chunk path by bytes_per_hop from packet (1, 2, or 3; legacy fallback uses 1) + # Chunk path by bytes_per_hop from packet (1, 2, or 3) path_hex, path_values = self._path_bytes_to_nodes(path_bytes, prefix_hex_chars=bytes_per_hop * 2) # Process path based on packet type diff --git a/modules/service_plugins/map_uploader_service.py b/modules/service_plugins/map_uploader_service.py index a5d78d1..9eb1e65 100644 --- a/modules/service_plugins/map_uploader_service.py +++ b/modules/service_plugins/map_uploader_service.py @@ -529,7 +529,10 @@ class MapUploaderService(BaseServicePlugin): path_len_byte = byte_data[offset] offset += 1 - path_byte_length, _ = decode_path_len_byte(path_len_byte) + path_parts = decode_path_len_byte(path_len_byte) + if path_parts is None: + return + path_byte_length, _ = path_parts # Skip path if path_byte_length > 0 and len(byte_data) > offset + path_byte_length: diff --git a/modules/service_plugins/packet_capture_service.py b/modules/service_plugins/packet_capture_service.py index 60a302a..c95496a 100644 --- a/modules/service_plugins/packet_capture_service.py +++ b/modules/service_plugins/packet_capture_service.py @@ -21,7 +21,12 @@ from meshcore import EventType from ..enums import PayloadType, PayloadVersion, RouteType # Import bot's utilities for packet hash -from ..utils import calculate_packet_hash, decode_path_len_byte, parse_trace_payload_route_hashes +from ..utils import ( + calculate_packet_hash, + decode_path_len_byte, + parse_trace_payload_route_hashes, + verify_meshcore_advert_ed25519, +) from ..version_info import resolve_runtime_version # Import MQTT client @@ -210,6 +215,16 @@ class PacketCaptureService(BaseServicePlugin): 'PacketCapture', 'rf_data_cache_timeout', fallback=15.0 ) + # Do not publish to MQTT when content hash is unknown (zeros) — unparseable / strict path reject + self.mqtt_skip_unparseable_packets = config.getboolean( + 'PacketCapture', 'mqtt_skip_unparseable_packets', fallback=True + ) + + # Skip MQTT for ADVERT packets that fail Ed25519 verify (mesh payload corruption) + self.advert_require_valid_signature = config.getboolean( + 'PacketCapture', 'advert_require_valid_signature', fallback=False + ) + # Note: Python signing can fetch private key from device if not provided via file # The create_auth_token_async function will automatically try to export the key # from the device if private_key_hex is None and meshcore_instance is available @@ -807,6 +822,22 @@ class PacketCaptureService(BaseServicePlugin): # Format packet data to match original script's format formatted_packet = self._format_packet_data(raw_hex, packet_info, payload, metadata) + skip_mqtt_unparseable = ( + self.mqtt_skip_unparseable_packets + and formatted_packet.get('hash') == '0000000000000000' + ) + + skip_mqtt_invalid_advert_signature = False + if self.advert_require_valid_signature and packet_info.get('payload_type') == PayloadType.ADVERT.name: + try: + mesh_payload = bytes.fromhex(packet_info.get('payload_hex', '')) + if not verify_meshcore_advert_ed25519(mesh_payload): + skip_mqtt_invalid_advert_signature = True + except Exception: + skip_mqtt_invalid_advert_signature = True + + skip_mqtt = skip_mqtt_unparseable or skip_mqtt_invalid_advert_signature + # Write to file if self.output_handle: self.output_handle.write(json.dumps(formatted_packet, default=str) + '\n') @@ -814,14 +845,32 @@ class PacketCaptureService(BaseServicePlugin): # Publish to MQTT if enabled # The publish function will check per-broker connection status - publish_metrics = {"attempted": 0, "succeeded": 0} - if self.mqtt_enabled: + publish_metrics: dict[str, Any] = { + "attempted": 0, + "succeeded": 0, + "skipped_by_filter": False, + "skipped_unparseable": False, + "skipped_invalid_advert_signature": False, + } + if self.mqtt_enabled and not skip_mqtt: if self.debug: self.logger.debug(f"Calling publish_packet_mqtt for packet {self.packet_count}") publish_metrics = await self.publish_packet_mqtt(formatted_packet) + publish_metrics.setdefault("skipped_unparseable", False) + publish_metrics.setdefault("skipped_invalid_advert_signature", False) + elif self.mqtt_enabled and skip_mqtt: + publish_metrics["skipped_unparseable"] = skip_mqtt_unparseable + publish_metrics["skipped_invalid_advert_signature"] = skip_mqtt_invalid_advert_signature # Log DEBUG level for each packet (verbose; use INFO only for service lifecycle) - action = "Skipping" if publish_metrics.get("skipped_by_filter") else "Captured" + if publish_metrics.get("skipped_unparseable"): + action = "Captured (MQTT skipped: zero hash / unparseable)" + elif publish_metrics.get("skipped_invalid_advert_signature"): + action = "Captured (MQTT skipped: invalid advert signature)" + elif publish_metrics.get("skipped_by_filter"): + action = "Skipping" + else: + action = "Captured" self.logger.debug(f"📦 {action} packet #{self.packet_count}: {formatted_packet['route']} type {formatted_packet['packet_type']}, {formatted_packet['len']} bytes, SNR: {formatted_packet['SNR']}, RSSI: {formatted_packet['RSSI']}, hash: {formatted_packet['hash']} (MQTT: {publish_metrics['succeeded']}/{publish_metrics['attempted']})") # Output full packet data structure in debug mode only (matches original script) @@ -879,7 +928,14 @@ class PacketCaptureService(BaseServicePlugin): path_len_byte = byte_data[offset] offset += 1 - path_byte_length, bytes_per_hop = decode_path_len_byte(path_len_byte) + path_parts = decode_path_len_byte(path_len_byte) + if path_parts is None: + if self.debug: + self.logger.debug( + "Packet invalid path_len encoding (not firmware-valid), cannot decode" + ) + return None + path_byte_length, bytes_per_hop = path_parts if len(byte_data) < offset + path_byte_length: if self.debug: @@ -890,7 +946,7 @@ class PacketCaptureService(BaseServicePlugin): path_bytes = byte_data[offset:offset + path_byte_length] offset += path_byte_length - # Chunk path by bytes_per_hop from packet (1, 2, or 3; legacy fallback uses 1) + # Chunk path by bytes_per_hop from packet (1, 2, or 3); odd remainder → 1-byte chunks hex_chars = bytes_per_hop * 2 path_hex = path_bytes.hex() path_nodes = [path_hex[i:i + hex_chars].upper() for i in range(0, len(path_hex), hex_chars)] diff --git a/modules/utils.py b/modules/utils.py index 3584e56..8f5c2bf 100644 --- a/modules/utils.py +++ b/modules/utils.py @@ -389,30 +389,29 @@ def get_major_city_queries(city: str, state_abbr: Optional[str] = None) -> list[ return [] -def decode_path_len_byte(path_len_byte: int, max_path_size: int = 64) -> tuple: - """Decode the RF packet path_len byte per firmware (Packet.cpp). +def decode_path_len_byte(path_len_byte: int, max_path_size: int = 64) -> tuple[int, int] | None: + """Decode the RF packet path_len byte per firmware ``Packet::isValidPathLen``. Encoding: low 6 bits = hop count, high 2 bits = size code. - bytes_per_hop = (path_len >> 6) + 1 → 1, 2, 3, or 4 (4 is reserved and invalid). + ``bytes_per_hop = (path_len >> 6) + 1`` → 1, 2, 3, or 4 (4 is reserved and invalid). Args: path_len_byte: The single path_len byte from the packet. - max_path_size: Max path bytes (default 64, matches MAX_PATH_SIZE). + max_path_size: Max path bytes (default 64, matches ``MAX_PATH_SIZE``). Returns: - Tuple of (path_byte_length, bytes_per_hop). If encoding is invalid - (hash_size==4 or hop_count*bytes_per_hop > max_path_size), returns - (path_len_byte, 1) for legacy: path_len is raw byte count, 1 byte per hop. + ``(path_byte_length, bytes_per_hop)`` if the encoding is valid on the wire. + ``None`` if reserved size class (4) or ``hop_count * bytes_per_hop > max_path_size`` + — matching MeshCore where ``readFrom`` rejects the packet (no legacy reinterpretation). """ hop_count = path_len_byte & 63 size_code = path_len_byte >> 6 bytes_per_hop = size_code + 1 # 1, 2, 3, or 4 if bytes_per_hop == 4: - # Reserved in firmware, invalid - return (path_len_byte, 1) + return None path_byte_length = hop_count * bytes_per_hop if path_byte_length > max_path_size: - return (path_len_byte, 1) + return None return (path_byte_length, bytes_per_hop) @@ -505,7 +504,10 @@ def calculate_packet_hash(raw_hex: str, payload_type: Optional[int] = None) -> s path_len_byte = byte_data[offset] offset += 1 - path_byte_length, _ = decode_path_len_byte(path_len_byte) + path_parts = decode_path_len_byte(path_len_byte) + if path_parts is None: + return "0000000000000000" + path_byte_length, _ = path_parts # Validate we have enough bytes for the path if len(byte_data) < offset + path_byte_length: @@ -541,6 +543,26 @@ def calculate_packet_hash(raw_hex: str, payload_type: Optional[int] = None) -> s return "0000000000000000" +def verify_meshcore_advert_ed25519(mesh_payload: bytes) -> bool: + """Verify MeshCore ADVERT Ed25519 signature (layout from ``Mesh::createAdvert``). + + Signed message is ``pub_key (32) + timestamp (4, LE) + app_data``; signature is + ``payload[36:100]`` (64 bytes); ``app_data`` starts at byte 100. + """ + if len(mesh_payload) < 100: + return False + try: + from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey + + pub = mesh_payload[:32] + msg = mesh_payload[:36] + mesh_payload[100:] + sig = mesh_payload[36:100] + Ed25519PublicKey.from_public_bytes(pub).verify(sig, msg) + return True + except Exception: + return False + + def calculate_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float: """Calculate haversine distance between two points in kilometers. diff --git a/tests/test_utils.py b/tests/test_utils.py index ac0f12b..28468b4 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -28,6 +28,7 @@ from modules.utils import ( parse_trace_payload_route_hashes, resolve_path, truncate_string, + verify_meshcore_advert_ed25519, ) @@ -185,50 +186,60 @@ class TestDecodePathLenByte: def test_single_byte_one_hop(self): # size_code=0 -> 1 byte/hop, hop_count=1 -> 1 path byte - path_byte_length, bytes_per_hop = decode_path_len_byte(0x01) + decoded = decode_path_len_byte(0x01) + assert decoded is not None + path_byte_length, bytes_per_hop = decoded assert path_byte_length == 1 assert bytes_per_hop == 1 def test_single_byte_three_hops(self): - path_byte_length, bytes_per_hop = decode_path_len_byte(0x03) + decoded = decode_path_len_byte(0x03) + assert decoded is not None + path_byte_length, bytes_per_hop = decoded assert path_byte_length == 3 assert bytes_per_hop == 1 def test_multi_byte_two_bytes_per_hop_one_hop(self): # size_code=1 -> 2 bytes/hop, hop_count=1 -> 2 path bytes - path_byte_length, bytes_per_hop = decode_path_len_byte(0x41) + decoded = decode_path_len_byte(0x41) + assert decoded is not None + path_byte_length, bytes_per_hop = decoded assert path_byte_length == 2 assert bytes_per_hop == 2 def test_multi_byte_two_bytes_per_hop_three_hops(self): # size_code=1, hop_count=3 -> 6 path bytes - path_byte_length, bytes_per_hop = decode_path_len_byte(0x43) + decoded = decode_path_len_byte(0x43) + assert decoded is not None + path_byte_length, bytes_per_hop = decoded assert path_byte_length == 6 assert bytes_per_hop == 2 def test_three_bytes_per_hop(self): # size_code=2 -> 3 bytes/hop, hop_count=2 -> 6 path bytes - path_byte_length, bytes_per_hop = decode_path_len_byte(0x82) + decoded = decode_path_len_byte(0x82) + assert decoded is not None + path_byte_length, bytes_per_hop = decoded assert path_byte_length == 6 assert bytes_per_hop == 3 - def test_reserved_size_code_fallback(self): - # size_code=3 (bytes_per_hop=4) is reserved -> legacy: path_len_byte as raw byte count, 1 byte/hop - path_byte_length, bytes_per_hop = decode_path_len_byte(0xC2) - assert path_byte_length == 0xC2 # raw byte value - assert bytes_per_hop == 1 + def test_reserved_size_code_returns_none(self): + # size_code=3 (bytes_per_hop=4) is reserved — firmware rejects + assert decode_path_len_byte(0xC2) is None - def test_path_exceeds_max_fallback(self): - # 32 hops * 2 bytes = 64, max_path_size=64 is ok; 33*2=66 > 64 -> legacy - path_byte_length, bytes_per_hop = decode_path_len_byte(0x41, max_path_size=64) + def test_path_exceeds_max_returns_none(self): + # 32 hops * 2 bytes = 64, max_path_size=64 is ok; 33*2=66 > 64 — reject + decoded = decode_path_len_byte(0x41, max_path_size=64) + assert decoded is not None + path_byte_length, bytes_per_hop = decoded assert path_byte_length == 2 assert bytes_per_hop == 2 - path_byte_length, bytes_per_hop = decode_path_len_byte(0x61, max_path_size=64) # 33 hops * 2 - assert path_byte_length == 0x61 - assert bytes_per_hop == 1 + assert decode_path_len_byte(0x61, max_path_size=64) is None # 33 hops * 2 def test_zero_hops(self): - path_byte_length, bytes_per_hop = decode_path_len_byte(0x00) + decoded = decode_path_len_byte(0x00) + assert decoded is not None + path_byte_length, bytes_per_hop = decoded assert path_byte_length == 0 assert bytes_per_hop == 1 @@ -258,15 +269,18 @@ class TestEncodePathLenByte: def test_three_hops_two_bytes_per_hop(self): assert encode_path_len_byte(3, 2) == 0x43 - pbl, bph = decode_path_len_byte(0x43) + dec = decode_path_len_byte(0x43) + assert dec is not None + pbl, bph = dec assert pbl == 6 assert bph == 2 def test_roundtrip_with_decode(self): for pb in (0x01, 0x03, 0x41, 0x43, 0x82, 0x00): - pbl, bph = decode_path_len_byte(pb) - if bph == 1 and pbl == pb: - continue # legacy fallback path + dec = decode_path_len_byte(pb) + if dec is None: + continue + pbl, bph = dec hop_count = pb & 0x3F assert encode_path_len_byte(hop_count, bph) == pb @@ -703,6 +717,64 @@ class TestFormatKeywordResponseWithPlaceholders: assert "SNR" in result assert "RSSI" in result +class TestVerifyMeshcoreAdvertEd25519: + """Tests for verify_meshcore_advert_ed25519() matching MeshCore createAdvert signing.""" + + def test_too_short_returns_false(self): + assert verify_meshcore_advert_ed25519(b"\x00" * 99) is False + + def test_valid_signed_payload_verifies(self): + from cryptography.hazmat.primitives import serialization + from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey + + priv = Ed25519PrivateKey.generate() + pub = priv.public_key().public_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PublicFormat.Raw, + ) + ts = (1718319158).to_bytes(4, "little") + app_data = b"\x92TestNode" # flags + short name; total mesh payload > 100 + msg = pub + ts + app_data + sig = priv.sign(msg) + mesh_payload = pub + ts + sig + app_data + assert len(mesh_payload) >= 100 + assert verify_meshcore_advert_ed25519(mesh_payload) is True + + def test_tampered_app_data_fails(self): + from cryptography.hazmat.primitives import serialization + from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey + + priv = Ed25519PrivateKey.generate() + pub = priv.public_key().public_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PublicFormat.Raw, + ) + ts = (1).to_bytes(4, "little") + app_data = b"\x01" * 8 + msg = pub + ts + app_data + sig = priv.sign(msg) + mesh_payload = bytearray(pub + ts + sig + app_data) + mesh_payload[-1] ^= 0xFF + assert verify_meshcore_advert_ed25519(bytes(mesh_payload)) is False + + def test_tampered_signature_fails(self): + from cryptography.hazmat.primitives import serialization + from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey + + priv = Ed25519PrivateKey.generate() + pub = priv.public_key().public_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PublicFormat.Raw, + ) + ts = (2).to_bytes(4, "little") + app_data = b"\x02" * 8 + msg = pub + ts + app_data + sig = priv.sign(msg) + mesh_payload = bytearray(pub + ts + sig + app_data) + mesh_payload[40] ^= 0xFF + assert verify_meshcore_advert_ed25519(bytes(mesh_payload)) is False + + class TestCalculatePacketHashPathLength: """Tests that calculate_packet_hash uses decode_path_len_byte so multi-byte paths skip correctly.""" @@ -786,6 +858,12 @@ class TestCalculatePacketHashEdgeCases: h = calculate_packet_hash("not-valid-hex!!!") assert h == "0000000000000000" + def test_invalid_path_len_encoding_returns_default_hash(self): + """Reserved path size class (hash_size==4) → strict decode fails → default hash.""" + # FLOOD + TXT_MSG, path_len byte 0xF5 (size code 3 → bytes_per_hop=4 reserved) + h = calculate_packet_hash("09F500") + assert h == "0000000000000000" + def test_non_transport_type_skips_transport_bytes(self): """Route type 1 (FLOOD) → no transport bytes; hash succeeds.""" # Header 0x09 = FLOOD + TXT_MSG, path_len=0, payload=0xFF