mirror of
https://github.com/agessaman/meshcore-bot.git
synced 2026-05-10 17:55:18 +00:00
feat(packet_capture): enhance MQTT packet publishing controls and path length decoding
- Added configuration options `mqtt_skip_unparseable_packets` and `advert_require_valid_signature` to control MQTT publishing behavior based on packet validity. - Updated `decode_path_len_byte` function to return `None` for reserved size codes, improving path length validation. - Implemented logic in `PacketCaptureService` to skip publishing unparseable packets and ADVERT packets with invalid signatures. - Introduced `verify_meshcore_advert_ed25519` function for signature verification of ADVERT packets, with corresponding unit tests to ensure functionality. - Enhanced documentation to reflect new configuration options and their effects on packet processing.
This commit is contained in:
@@ -1493,6 +1493,14 @@ raw_duplicate_window = 2.0
|
||||
# Keep RX_LOG SNR/RSSI keyed by packet prefix this long for RAW_DATA correlation (seconds).
|
||||
rf_data_cache_timeout = 15.0
|
||||
|
||||
# If true, do not publish packet JSON to MQTT when content hash is 0000000000000000
|
||||
# (strict path_len reject, too-short buffer, or other unparseable cases). File output is unchanged.
|
||||
mqtt_skip_unparseable_packets = true
|
||||
|
||||
# If true, do not publish ADVERT packets to MQTT when mesh payload fails Ed25519 verification
|
||||
# (MeshCore createAdvert signature over pubkey || timestamp || app_data). File output is unchanged.
|
||||
advert_require_valid_signature = false
|
||||
|
||||
# Owner information (for packet analyzer registration)
|
||||
# Owner public key (64-character hex string)
|
||||
owner_public_key =
|
||||
|
||||
@@ -45,6 +45,11 @@ enabled = true # Enable packet capture
|
||||
output_file = packets.json # Optional: save to file
|
||||
verbose = false # Detailed packet logging
|
||||
debug = false # Debug mode
|
||||
mqtt_skip_unparseable_packets = true # Skip MQTT when content hash is all zeros (strict path reject / short buffer)
|
||||
|
||||
# Optional: skip MQTT for ADVERT packets whose Ed25519 signature does not verify (damaged or spoofed mesh payload).
|
||||
# Does not affect file/JSONL capture.
|
||||
advert_require_valid_signature = false
|
||||
```
|
||||
|
||||
### Authentication
|
||||
|
||||
@@ -1350,7 +1350,13 @@ class MessageHandler:
|
||||
path_len_byte = byte_data[offset]
|
||||
offset += 1
|
||||
# Decode per firmware: low 6 bits = hop count, high 2 bits = size code (bytes_per_hop = code+1)
|
||||
path_byte_length, bytes_per_hop = decode_path_len_byte(path_len_byte)
|
||||
path_parts = decode_path_len_byte(path_len_byte)
|
||||
if path_parts is None:
|
||||
self.logger.debug(
|
||||
"decode_meshcore_packet: invalid path_len byte (firmware would reject)"
|
||||
)
|
||||
return None
|
||||
path_byte_length, bytes_per_hop = path_parts
|
||||
|
||||
# Check if we have enough data for the full path
|
||||
if len(byte_data) < offset + path_byte_length:
|
||||
@@ -1375,7 +1381,7 @@ class MessageHandler:
|
||||
# Extract payload type (bits 2-5)
|
||||
payload_type = PayloadType((header >> 2) & 0x0F)
|
||||
|
||||
# Chunk path by bytes_per_hop from packet (1, 2, or 3; legacy fallback uses 1)
|
||||
# Chunk path by bytes_per_hop from packet (1, 2, or 3)
|
||||
path_hex, path_values = self._path_bytes_to_nodes(path_bytes, prefix_hex_chars=bytes_per_hop * 2)
|
||||
|
||||
# Process path based on packet type
|
||||
|
||||
@@ -529,7 +529,10 @@ class MapUploaderService(BaseServicePlugin):
|
||||
|
||||
path_len_byte = byte_data[offset]
|
||||
offset += 1
|
||||
path_byte_length, _ = decode_path_len_byte(path_len_byte)
|
||||
path_parts = decode_path_len_byte(path_len_byte)
|
||||
if path_parts is None:
|
||||
return
|
||||
path_byte_length, _ = path_parts
|
||||
|
||||
# Skip path
|
||||
if path_byte_length > 0 and len(byte_data) > offset + path_byte_length:
|
||||
|
||||
@@ -21,7 +21,12 @@ from meshcore import EventType
|
||||
from ..enums import PayloadType, PayloadVersion, RouteType
|
||||
|
||||
# Import bot's utilities for packet hash
|
||||
from ..utils import calculate_packet_hash, decode_path_len_byte, parse_trace_payload_route_hashes
|
||||
from ..utils import (
|
||||
calculate_packet_hash,
|
||||
decode_path_len_byte,
|
||||
parse_trace_payload_route_hashes,
|
||||
verify_meshcore_advert_ed25519,
|
||||
)
|
||||
from ..version_info import resolve_runtime_version
|
||||
|
||||
# Import MQTT client
|
||||
@@ -210,6 +215,16 @@ class PacketCaptureService(BaseServicePlugin):
|
||||
'PacketCapture', 'rf_data_cache_timeout', fallback=15.0
|
||||
)
|
||||
|
||||
# Do not publish to MQTT when content hash is unknown (zeros) — unparseable / strict path reject
|
||||
self.mqtt_skip_unparseable_packets = config.getboolean(
|
||||
'PacketCapture', 'mqtt_skip_unparseable_packets', fallback=True
|
||||
)
|
||||
|
||||
# Skip MQTT for ADVERT packets that fail Ed25519 verify (mesh payload corruption)
|
||||
self.advert_require_valid_signature = config.getboolean(
|
||||
'PacketCapture', 'advert_require_valid_signature', fallback=False
|
||||
)
|
||||
|
||||
# Note: Python signing can fetch private key from device if not provided via file
|
||||
# The create_auth_token_async function will automatically try to export the key
|
||||
# from the device if private_key_hex is None and meshcore_instance is available
|
||||
@@ -807,6 +822,22 @@ class PacketCaptureService(BaseServicePlugin):
|
||||
# Format packet data to match original script's format
|
||||
formatted_packet = self._format_packet_data(raw_hex, packet_info, payload, metadata)
|
||||
|
||||
skip_mqtt_unparseable = (
|
||||
self.mqtt_skip_unparseable_packets
|
||||
and formatted_packet.get('hash') == '0000000000000000'
|
||||
)
|
||||
|
||||
skip_mqtt_invalid_advert_signature = False
|
||||
if self.advert_require_valid_signature and packet_info.get('payload_type') == PayloadType.ADVERT.name:
|
||||
try:
|
||||
mesh_payload = bytes.fromhex(packet_info.get('payload_hex', ''))
|
||||
if not verify_meshcore_advert_ed25519(mesh_payload):
|
||||
skip_mqtt_invalid_advert_signature = True
|
||||
except Exception:
|
||||
skip_mqtt_invalid_advert_signature = True
|
||||
|
||||
skip_mqtt = skip_mqtt_unparseable or skip_mqtt_invalid_advert_signature
|
||||
|
||||
# Write to file
|
||||
if self.output_handle:
|
||||
self.output_handle.write(json.dumps(formatted_packet, default=str) + '\n')
|
||||
@@ -814,14 +845,32 @@ class PacketCaptureService(BaseServicePlugin):
|
||||
|
||||
# Publish to MQTT if enabled
|
||||
# The publish function will check per-broker connection status
|
||||
publish_metrics = {"attempted": 0, "succeeded": 0}
|
||||
if self.mqtt_enabled:
|
||||
publish_metrics: dict[str, Any] = {
|
||||
"attempted": 0,
|
||||
"succeeded": 0,
|
||||
"skipped_by_filter": False,
|
||||
"skipped_unparseable": False,
|
||||
"skipped_invalid_advert_signature": False,
|
||||
}
|
||||
if self.mqtt_enabled and not skip_mqtt:
|
||||
if self.debug:
|
||||
self.logger.debug(f"Calling publish_packet_mqtt for packet {self.packet_count}")
|
||||
publish_metrics = await self.publish_packet_mqtt(formatted_packet)
|
||||
publish_metrics.setdefault("skipped_unparseable", False)
|
||||
publish_metrics.setdefault("skipped_invalid_advert_signature", False)
|
||||
elif self.mqtt_enabled and skip_mqtt:
|
||||
publish_metrics["skipped_unparseable"] = skip_mqtt_unparseable
|
||||
publish_metrics["skipped_invalid_advert_signature"] = skip_mqtt_invalid_advert_signature
|
||||
|
||||
# Log DEBUG level for each packet (verbose; use INFO only for service lifecycle)
|
||||
action = "Skipping" if publish_metrics.get("skipped_by_filter") else "Captured"
|
||||
if publish_metrics.get("skipped_unparseable"):
|
||||
action = "Captured (MQTT skipped: zero hash / unparseable)"
|
||||
elif publish_metrics.get("skipped_invalid_advert_signature"):
|
||||
action = "Captured (MQTT skipped: invalid advert signature)"
|
||||
elif publish_metrics.get("skipped_by_filter"):
|
||||
action = "Skipping"
|
||||
else:
|
||||
action = "Captured"
|
||||
self.logger.debug(f"📦 {action} packet #{self.packet_count}: {formatted_packet['route']} type {formatted_packet['packet_type']}, {formatted_packet['len']} bytes, SNR: {formatted_packet['SNR']}, RSSI: {formatted_packet['RSSI']}, hash: {formatted_packet['hash']} (MQTT: {publish_metrics['succeeded']}/{publish_metrics['attempted']})")
|
||||
|
||||
# Output full packet data structure in debug mode only (matches original script)
|
||||
@@ -879,7 +928,14 @@ class PacketCaptureService(BaseServicePlugin):
|
||||
|
||||
path_len_byte = byte_data[offset]
|
||||
offset += 1
|
||||
path_byte_length, bytes_per_hop = decode_path_len_byte(path_len_byte)
|
||||
path_parts = decode_path_len_byte(path_len_byte)
|
||||
if path_parts is None:
|
||||
if self.debug:
|
||||
self.logger.debug(
|
||||
"Packet invalid path_len encoding (not firmware-valid), cannot decode"
|
||||
)
|
||||
return None
|
||||
path_byte_length, bytes_per_hop = path_parts
|
||||
|
||||
if len(byte_data) < offset + path_byte_length:
|
||||
if self.debug:
|
||||
@@ -890,7 +946,7 @@ class PacketCaptureService(BaseServicePlugin):
|
||||
path_bytes = byte_data[offset:offset + path_byte_length]
|
||||
offset += path_byte_length
|
||||
|
||||
# Chunk path by bytes_per_hop from packet (1, 2, or 3; legacy fallback uses 1)
|
||||
# Chunk path by bytes_per_hop from packet (1, 2, or 3); odd remainder → 1-byte chunks
|
||||
hex_chars = bytes_per_hop * 2
|
||||
path_hex = path_bytes.hex()
|
||||
path_nodes = [path_hex[i:i + hex_chars].upper() for i in range(0, len(path_hex), hex_chars)]
|
||||
|
||||
+33
-11
@@ -389,30 +389,29 @@ def get_major_city_queries(city: str, state_abbr: Optional[str] = None) -> list[
|
||||
return []
|
||||
|
||||
|
||||
def decode_path_len_byte(path_len_byte: int, max_path_size: int = 64) -> tuple:
|
||||
"""Decode the RF packet path_len byte per firmware (Packet.cpp).
|
||||
def decode_path_len_byte(path_len_byte: int, max_path_size: int = 64) -> tuple[int, int] | None:
|
||||
"""Decode the RF packet path_len byte per firmware ``Packet::isValidPathLen``.
|
||||
|
||||
Encoding: low 6 bits = hop count, high 2 bits = size code.
|
||||
bytes_per_hop = (path_len >> 6) + 1 → 1, 2, 3, or 4 (4 is reserved and invalid).
|
||||
``bytes_per_hop = (path_len >> 6) + 1`` → 1, 2, 3, or 4 (4 is reserved and invalid).
|
||||
|
||||
Args:
|
||||
path_len_byte: The single path_len byte from the packet.
|
||||
max_path_size: Max path bytes (default 64, matches MAX_PATH_SIZE).
|
||||
max_path_size: Max path bytes (default 64, matches ``MAX_PATH_SIZE``).
|
||||
|
||||
Returns:
|
||||
Tuple of (path_byte_length, bytes_per_hop). If encoding is invalid
|
||||
(hash_size==4 or hop_count*bytes_per_hop > max_path_size), returns
|
||||
(path_len_byte, 1) for legacy: path_len is raw byte count, 1 byte per hop.
|
||||
``(path_byte_length, bytes_per_hop)`` if the encoding is valid on the wire.
|
||||
``None`` if reserved size class (4) or ``hop_count * bytes_per_hop > max_path_size``
|
||||
— matching MeshCore where ``readFrom`` rejects the packet (no legacy reinterpretation).
|
||||
"""
|
||||
hop_count = path_len_byte & 63
|
||||
size_code = path_len_byte >> 6
|
||||
bytes_per_hop = size_code + 1 # 1, 2, 3, or 4
|
||||
if bytes_per_hop == 4:
|
||||
# Reserved in firmware, invalid
|
||||
return (path_len_byte, 1)
|
||||
return None
|
||||
path_byte_length = hop_count * bytes_per_hop
|
||||
if path_byte_length > max_path_size:
|
||||
return (path_len_byte, 1)
|
||||
return None
|
||||
return (path_byte_length, bytes_per_hop)
|
||||
|
||||
|
||||
@@ -505,7 +504,10 @@ def calculate_packet_hash(raw_hex: str, payload_type: Optional[int] = None) -> s
|
||||
|
||||
path_len_byte = byte_data[offset]
|
||||
offset += 1
|
||||
path_byte_length, _ = decode_path_len_byte(path_len_byte)
|
||||
path_parts = decode_path_len_byte(path_len_byte)
|
||||
if path_parts is None:
|
||||
return "0000000000000000"
|
||||
path_byte_length, _ = path_parts
|
||||
|
||||
# Validate we have enough bytes for the path
|
||||
if len(byte_data) < offset + path_byte_length:
|
||||
@@ -541,6 +543,26 @@ def calculate_packet_hash(raw_hex: str, payload_type: Optional[int] = None) -> s
|
||||
return "0000000000000000"
|
||||
|
||||
|
||||
def verify_meshcore_advert_ed25519(mesh_payload: bytes) -> bool:
|
||||
"""Verify MeshCore ADVERT Ed25519 signature (layout from ``Mesh::createAdvert``).
|
||||
|
||||
Signed message is ``pub_key (32) + timestamp (4, LE) + app_data``; signature is
|
||||
``payload[36:100]`` (64 bytes); ``app_data`` starts at byte 100.
|
||||
"""
|
||||
if len(mesh_payload) < 100:
|
||||
return False
|
||||
try:
|
||||
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
|
||||
|
||||
pub = mesh_payload[:32]
|
||||
msg = mesh_payload[:36] + mesh_payload[100:]
|
||||
sig = mesh_payload[36:100]
|
||||
Ed25519PublicKey.from_public_bytes(pub).verify(sig, msg)
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def calculate_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
|
||||
"""Calculate haversine distance between two points in kilometers.
|
||||
|
||||
|
||||
+99
-21
@@ -28,6 +28,7 @@ from modules.utils import (
|
||||
parse_trace_payload_route_hashes,
|
||||
resolve_path,
|
||||
truncate_string,
|
||||
verify_meshcore_advert_ed25519,
|
||||
)
|
||||
|
||||
|
||||
@@ -185,50 +186,60 @@ class TestDecodePathLenByte:
|
||||
|
||||
def test_single_byte_one_hop(self):
|
||||
# size_code=0 -> 1 byte/hop, hop_count=1 -> 1 path byte
|
||||
path_byte_length, bytes_per_hop = decode_path_len_byte(0x01)
|
||||
decoded = decode_path_len_byte(0x01)
|
||||
assert decoded is not None
|
||||
path_byte_length, bytes_per_hop = decoded
|
||||
assert path_byte_length == 1
|
||||
assert bytes_per_hop == 1
|
||||
|
||||
def test_single_byte_three_hops(self):
|
||||
path_byte_length, bytes_per_hop = decode_path_len_byte(0x03)
|
||||
decoded = decode_path_len_byte(0x03)
|
||||
assert decoded is not None
|
||||
path_byte_length, bytes_per_hop = decoded
|
||||
assert path_byte_length == 3
|
||||
assert bytes_per_hop == 1
|
||||
|
||||
def test_multi_byte_two_bytes_per_hop_one_hop(self):
|
||||
# size_code=1 -> 2 bytes/hop, hop_count=1 -> 2 path bytes
|
||||
path_byte_length, bytes_per_hop = decode_path_len_byte(0x41)
|
||||
decoded = decode_path_len_byte(0x41)
|
||||
assert decoded is not None
|
||||
path_byte_length, bytes_per_hop = decoded
|
||||
assert path_byte_length == 2
|
||||
assert bytes_per_hop == 2
|
||||
|
||||
def test_multi_byte_two_bytes_per_hop_three_hops(self):
|
||||
# size_code=1, hop_count=3 -> 6 path bytes
|
||||
path_byte_length, bytes_per_hop = decode_path_len_byte(0x43)
|
||||
decoded = decode_path_len_byte(0x43)
|
||||
assert decoded is not None
|
||||
path_byte_length, bytes_per_hop = decoded
|
||||
assert path_byte_length == 6
|
||||
assert bytes_per_hop == 2
|
||||
|
||||
def test_three_bytes_per_hop(self):
|
||||
# size_code=2 -> 3 bytes/hop, hop_count=2 -> 6 path bytes
|
||||
path_byte_length, bytes_per_hop = decode_path_len_byte(0x82)
|
||||
decoded = decode_path_len_byte(0x82)
|
||||
assert decoded is not None
|
||||
path_byte_length, bytes_per_hop = decoded
|
||||
assert path_byte_length == 6
|
||||
assert bytes_per_hop == 3
|
||||
|
||||
def test_reserved_size_code_fallback(self):
|
||||
# size_code=3 (bytes_per_hop=4) is reserved -> legacy: path_len_byte as raw byte count, 1 byte/hop
|
||||
path_byte_length, bytes_per_hop = decode_path_len_byte(0xC2)
|
||||
assert path_byte_length == 0xC2 # raw byte value
|
||||
assert bytes_per_hop == 1
|
||||
def test_reserved_size_code_returns_none(self):
|
||||
# size_code=3 (bytes_per_hop=4) is reserved — firmware rejects
|
||||
assert decode_path_len_byte(0xC2) is None
|
||||
|
||||
def test_path_exceeds_max_fallback(self):
|
||||
# 32 hops * 2 bytes = 64, max_path_size=64 is ok; 33*2=66 > 64 -> legacy
|
||||
path_byte_length, bytes_per_hop = decode_path_len_byte(0x41, max_path_size=64)
|
||||
def test_path_exceeds_max_returns_none(self):
|
||||
# 32 hops * 2 bytes = 64, max_path_size=64 is ok; 33*2=66 > 64 — reject
|
||||
decoded = decode_path_len_byte(0x41, max_path_size=64)
|
||||
assert decoded is not None
|
||||
path_byte_length, bytes_per_hop = decoded
|
||||
assert path_byte_length == 2
|
||||
assert bytes_per_hop == 2
|
||||
path_byte_length, bytes_per_hop = decode_path_len_byte(0x61, max_path_size=64) # 33 hops * 2
|
||||
assert path_byte_length == 0x61
|
||||
assert bytes_per_hop == 1
|
||||
assert decode_path_len_byte(0x61, max_path_size=64) is None # 33 hops * 2
|
||||
|
||||
def test_zero_hops(self):
|
||||
path_byte_length, bytes_per_hop = decode_path_len_byte(0x00)
|
||||
decoded = decode_path_len_byte(0x00)
|
||||
assert decoded is not None
|
||||
path_byte_length, bytes_per_hop = decoded
|
||||
assert path_byte_length == 0
|
||||
assert bytes_per_hop == 1
|
||||
|
||||
@@ -258,15 +269,18 @@ class TestEncodePathLenByte:
|
||||
|
||||
def test_three_hops_two_bytes_per_hop(self):
|
||||
assert encode_path_len_byte(3, 2) == 0x43
|
||||
pbl, bph = decode_path_len_byte(0x43)
|
||||
dec = decode_path_len_byte(0x43)
|
||||
assert dec is not None
|
||||
pbl, bph = dec
|
||||
assert pbl == 6
|
||||
assert bph == 2
|
||||
|
||||
def test_roundtrip_with_decode(self):
|
||||
for pb in (0x01, 0x03, 0x41, 0x43, 0x82, 0x00):
|
||||
pbl, bph = decode_path_len_byte(pb)
|
||||
if bph == 1 and pbl == pb:
|
||||
continue # legacy fallback path
|
||||
dec = decode_path_len_byte(pb)
|
||||
if dec is None:
|
||||
continue
|
||||
pbl, bph = dec
|
||||
hop_count = pb & 0x3F
|
||||
assert encode_path_len_byte(hop_count, bph) == pb
|
||||
|
||||
@@ -703,6 +717,64 @@ class TestFormatKeywordResponseWithPlaceholders:
|
||||
assert "SNR" in result
|
||||
assert "RSSI" in result
|
||||
|
||||
class TestVerifyMeshcoreAdvertEd25519:
|
||||
"""Tests for verify_meshcore_advert_ed25519() matching MeshCore createAdvert signing."""
|
||||
|
||||
def test_too_short_returns_false(self):
|
||||
assert verify_meshcore_advert_ed25519(b"\x00" * 99) is False
|
||||
|
||||
def test_valid_signed_payload_verifies(self):
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
|
||||
|
||||
priv = Ed25519PrivateKey.generate()
|
||||
pub = priv.public_key().public_bytes(
|
||||
encoding=serialization.Encoding.Raw,
|
||||
format=serialization.PublicFormat.Raw,
|
||||
)
|
||||
ts = (1718319158).to_bytes(4, "little")
|
||||
app_data = b"\x92TestNode" # flags + short name; total mesh payload > 100
|
||||
msg = pub + ts + app_data
|
||||
sig = priv.sign(msg)
|
||||
mesh_payload = pub + ts + sig + app_data
|
||||
assert len(mesh_payload) >= 100
|
||||
assert verify_meshcore_advert_ed25519(mesh_payload) is True
|
||||
|
||||
def test_tampered_app_data_fails(self):
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
|
||||
|
||||
priv = Ed25519PrivateKey.generate()
|
||||
pub = priv.public_key().public_bytes(
|
||||
encoding=serialization.Encoding.Raw,
|
||||
format=serialization.PublicFormat.Raw,
|
||||
)
|
||||
ts = (1).to_bytes(4, "little")
|
||||
app_data = b"\x01" * 8
|
||||
msg = pub + ts + app_data
|
||||
sig = priv.sign(msg)
|
||||
mesh_payload = bytearray(pub + ts + sig + app_data)
|
||||
mesh_payload[-1] ^= 0xFF
|
||||
assert verify_meshcore_advert_ed25519(bytes(mesh_payload)) is False
|
||||
|
||||
def test_tampered_signature_fails(self):
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
|
||||
|
||||
priv = Ed25519PrivateKey.generate()
|
||||
pub = priv.public_key().public_bytes(
|
||||
encoding=serialization.Encoding.Raw,
|
||||
format=serialization.PublicFormat.Raw,
|
||||
)
|
||||
ts = (2).to_bytes(4, "little")
|
||||
app_data = b"\x02" * 8
|
||||
msg = pub + ts + app_data
|
||||
sig = priv.sign(msg)
|
||||
mesh_payload = bytearray(pub + ts + sig + app_data)
|
||||
mesh_payload[40] ^= 0xFF
|
||||
assert verify_meshcore_advert_ed25519(bytes(mesh_payload)) is False
|
||||
|
||||
|
||||
class TestCalculatePacketHashPathLength:
|
||||
"""Tests that calculate_packet_hash uses decode_path_len_byte so multi-byte paths skip correctly."""
|
||||
|
||||
@@ -786,6 +858,12 @@ class TestCalculatePacketHashEdgeCases:
|
||||
h = calculate_packet_hash("not-valid-hex!!!")
|
||||
assert h == "0000000000000000"
|
||||
|
||||
def test_invalid_path_len_encoding_returns_default_hash(self):
|
||||
"""Reserved path size class (hash_size==4) → strict decode fails → default hash."""
|
||||
# FLOOD + TXT_MSG, path_len byte 0xF5 (size code 3 → bytes_per_hop=4 reserved)
|
||||
h = calculate_packet_hash("09F500")
|
||||
assert h == "0000000000000000"
|
||||
|
||||
def test_non_transport_type_skips_transport_bytes(self):
|
||||
"""Route type 1 (FLOOD) → no transport bytes; hash succeeds."""
|
||||
# Header 0x09 = FLOOD + TXT_MSG, path_len=0, payload=0xFF
|
||||
|
||||
Reference in New Issue
Block a user