Enhance TRACE payload handling and path extraction in message processing

- Updated `update_mesh_graph_from_trace_data` to clarify the format of `path_hashes` as per-hop hash strings from the trace payload.
- Modified `MessageHandler` to differentiate between TRACE packets and regular transmissions, preventing incorrect extraction of repeater prefixes from RF path bytes.
- Introduced `parse_trace_payload_route_hashes` utility to extract TRACE route hash segments from payloads, ensuring accurate handling of path data.
- Enhanced `PacketCaptureService` to correctly populate packet information for TRACE packets, including SNR path and route hashes.
- Expanded test coverage for TRACE payload decoding and path extraction to validate functionality and correctness.

These changes improve the accuracy and reliability of TRACE data processing in the application.
This commit is contained in:
agessaman
2026-03-30 17:36:52 -07:00
parent 6a7a79af3c
commit f3e667e64f
7 changed files with 247 additions and 76 deletions
+2 -1
View File
@@ -25,7 +25,8 @@ def update_mesh_graph_from_trace_data(
Args:
bot: MeshCoreBot instance (mesh_graph, transmission_tracker, config, db_manager, meshcore).
path_hashes: List of node hash prefixes (1-byte each, as 2-char hex strings) from trace payload.
path_hashes: Per-hop hash strings from the trace payload (uppercase hex). Length in nibbles is
``2 * (1 << (flags & 3))`` per hop (1, 2, 4, or 8 bytes), not always 2 hex chars.
packet_info: Packet information dictionary (packet_hash optional; used when is_our_trace is None).
is_our_trace: If None, derived from packet_info['packet_hash'] and transmission_tracker.
If True/False, use that value (e.g. trace command sets True when TRACE_DATA matches our tag).
+141 -73
View File
@@ -709,96 +709,158 @@ class MessageHandler:
payload_type_value = int(payload_type_value)
packet_hash = calculate_packet_hash(packet_hex_for_hash, payload_type_value)
is_trace = decoded_packet.get('payload_type') == PayloadType.TRACE.value
# Check if this is a repeat of one of our transmissions
if (hasattr(self.bot, 'transmission_tracker') and
self.bot.transmission_tracker and
packet_hash and packet_hash != "0000000000000000"):
# Extract repeater prefixes from path - try multiple field names
# decode_meshcore_packet returns 'path' not 'path_nodes'
path_nodes = decoded_packet.get('path', [])
# Also try 'path_nodes' field (from routing_info)
if not path_nodes:
path_nodes = decoded_packet.get('path_nodes', [])
# TRACE: RF path bytes are per-hop SNR×4, not repeater hashes — do not
# extract prefixes or record repeats from them.
if not is_trace:
# Extract repeater prefixes from path - try multiple field names
# decode_meshcore_packet returns 'path' not 'path_nodes'
path_nodes = decoded_packet.get('path', [])
# Also try 'path_nodes' field (from routing_info)
if not path_nodes:
path_nodes = decoded_packet.get('path_nodes', [])
path_hex = decoded_packet.get('path_hex', '')
path_hex = decoded_packet.get('path_hex', '')
# If we don't have path_nodes but have path_hex, convert it
if not path_nodes and path_hex and len(path_hex) >= 2:
path_nodes = self._path_hex_to_nodes(path_hex)
# If we don't have path_nodes but have path_hex, convert it
if not path_nodes and path_hex and len(path_hex) >= 2:
path_nodes = self._path_hex_to_nodes(path_hex)
path_string = ','.join(path_nodes) if path_nodes else None
path_string = ','.join(path_nodes) if path_nodes else None
# Debug logging
if path_nodes:
self.logger.debug(f"📡 Extracting prefixes from path_nodes: {path_nodes}, path_hex: {path_hex}, bot_prefix: {self.bot.transmission_tracker.bot_prefix}")
# Debug logging
if path_nodes:
self.logger.debug(f"📡 Extracting prefixes from path_nodes: {path_nodes}, path_hex: {path_hex}, bot_prefix: {self.bot.transmission_tracker.bot_prefix}")
# Try to match this packet hash to a transmission
record = self.bot.transmission_tracker.match_packet_hash(
packet_hash, current_time
)
if record:
# This is one of our transmissions - check for repeats
# Extract repeater prefix from the last hop in the path
# (the repeater that sent this packet to us)
prefixes = self.bot.transmission_tracker.extract_repeater_prefixes_from_path(
path_string, path_nodes
# Try to match this packet hash to a transmission
record = self.bot.transmission_tracker.match_packet_hash(
packet_hash, current_time
)
# Log for debugging
if prefixes:
self.logger.info(f"📡 Found {len(prefixes)} repeater prefix(es) in repeat: {', '.join(prefixes)}")
elif path_nodes or path_hex:
self.logger.debug(f"📡 Repeat detected but no repeater prefixes extracted (path_nodes: {path_nodes}, path_hex: {path_hex}, bot_prefix: {self.bot.transmission_tracker.bot_prefix})")
if record:
# This is one of our transmissions - check for repeats
# Extract repeater prefix from the last hop in the path
# (the repeater that sent this packet to us)
prefixes = self.bot.transmission_tracker.extract_repeater_prefixes_from_path(
path_string, path_nodes
)
# Record the repeat
for prefix in prefixes:
self.bot.transmission_tracker.record_repeat(packet_hash, prefix)
# Log for debugging
if prefixes:
self.logger.info(f"📡 Found {len(prefixes)} repeater prefix(es) in repeat: {', '.join(prefixes)}")
elif path_nodes or path_hex:
self.logger.debug(f"📡 Repeat detected but no repeater prefixes extracted (path_nodes: {path_nodes}, path_hex: {path_hex}, bot_prefix: {self.bot.transmission_tracker.bot_prefix})")
# If no prefixes but we have a path, it might be a direct repeat
# (path contains our own node, so we filter it out)
if not prefixes and (path_nodes or path_hex):
# Still count as a repeat (heard by our radio)
self.bot.transmission_tracker.record_repeat(packet_hash, None)
# Record the repeat
for prefix in prefixes:
self.bot.transmission_tracker.record_repeat(packet_hash, prefix)
routing_info = {
'path_length': decoded_packet.get('path_len', 0),
'path_len_byte': decoded_packet.get('path_len_byte'),
'path_byte_length': decoded_packet.get('path_byte_length'),
'bytes_per_hop': decoded_packet.get('bytes_per_hop', 1),
'path_hex': decoded_packet.get('path_hex', ''),
'path_nodes': decoded_packet.get('path', []),
'route_type': decoded_packet.get('route_type_name', 'Unknown'),
'payload_length': payload_length, # Use the actual payload length
'payload_type': decoded_packet.get('payload_type_name', 'Unknown'),
'packet_hash': packet_hash # Store hash for packet tracking
}
# If no prefixes but we have a path, it might be a direct repeat
# (path contains our own node, so we filter it out)
if not prefixes and (path_nodes or path_hex):
# Still count as a repeat (heard by our radio)
self.bot.transmission_tracker.record_repeat(packet_hash, None)
else:
record = self.bot.transmission_tracker.match_packet_hash(
packet_hash, current_time
)
if record:
self.logger.debug(
"📡 TRACE packet matched our transmission; skipping repeater prefix "
"extraction (RF path holds SNR bytes, not node hashes)"
)
pi = decoded_packet.get('path_info') or {}
trace_route_hashes = list(pi.get('path_hashes') or pi.get('path') or [])
trace_snr_db = list(pi.get('snr_data') or [])
if is_trace:
routing_info = {
'path_length': len(trace_route_hashes) if trace_route_hashes else decoded_packet.get('path_len', 0),
'path_len_byte': decoded_packet.get('path_len_byte'),
'path_byte_length': decoded_packet.get('path_byte_length'),
'bytes_per_hop': decoded_packet.get('bytes_per_hop', 1),
'path_hex': decoded_packet.get('path_hex', ''),
'path_nodes': trace_route_hashes,
'trace_route_hashes': trace_route_hashes,
'trace_snr_db': trace_snr_db,
'trace_snr_path_hex': decoded_packet.get('path_hex', ''),
'route_type': decoded_packet.get('route_type_name', 'Unknown'),
'payload_length': payload_length,
'payload_type': decoded_packet.get('payload_type_name', 'Unknown'),
'packet_hash': packet_hash,
}
else:
routing_info = {
'path_length': decoded_packet.get('path_len', 0),
'path_len_byte': decoded_packet.get('path_len_byte'),
'path_byte_length': decoded_packet.get('path_byte_length'),
'bytes_per_hop': decoded_packet.get('bytes_per_hop', 1),
'path_hex': decoded_packet.get('path_hex', ''),
'path_nodes': decoded_packet.get('path', []),
'route_type': decoded_packet.get('route_type_name', 'Unknown'),
'payload_length': payload_length,
'payload_type': decoded_packet.get('payload_type_name', 'Unknown'),
'packet_hash': packet_hash,
}
# Validate path consistency (path_byte_length, path_hex, path_nodes, bytes_per_hop)
path_len = routing_info['path_length']
path_byte_len = routing_info.get('path_byte_length')
path_hex_str = routing_info.get('path_hex', '')
path_nodes_list = routing_info.get('path_nodes') or []
bph = routing_info.get('bytes_per_hop', 1) or 1
expected_hex_len = (path_byte_len * 2) if path_byte_len is not None else (path_len * bph * 2)
if path_len > 0 and path_hex_str:
if len(path_hex_str) != expected_hex_len:
self.logger.warning(
"Path length mismatch: path_hex has %d hex chars, expected %d (path_byte_length=%s, path_length=%s, bytes_per_hop=%s)",
len(path_hex_str), expected_hex_len, path_byte_len, path_len, bph
)
if path_nodes_list and len(path_nodes_list) != path_len:
self.logger.warning(
"Path nodes count mismatch: %d nodes, path_length=%d",
len(path_nodes_list), path_len
)
if path_nodes_list and bph >= 1 and any(len(str(n)) != bph * 2 for n in path_nodes_list):
self.logger.warning(
"Path node width mismatch: bytes_per_hop=%d expects %d hex chars per node, nodes=%s",
bph, bph * 2, path_nodes_list[:5]
)
if not is_trace:
path_len = routing_info['path_length']
path_byte_len = routing_info.get('path_byte_length')
path_hex_str = routing_info.get('path_hex', '')
path_nodes_list = routing_info.get('path_nodes') or []
bph = routing_info.get('bytes_per_hop', 1) or 1
expected_hex_len = (path_byte_len * 2) if path_byte_len is not None else (path_len * bph * 2)
if path_len > 0 and path_hex_str:
if len(path_hex_str) != expected_hex_len:
self.logger.warning(
"Path length mismatch: path_hex has %d hex chars, expected %d (path_byte_length=%s, path_length=%s, bytes_per_hop=%s)",
len(path_hex_str), expected_hex_len, path_byte_len, path_len, bph
)
if path_nodes_list and len(path_nodes_list) != path_len:
self.logger.warning(
"Path nodes count mismatch: %d nodes, path_length=%d",
len(path_nodes_list), path_len
)
if path_nodes_list and bph >= 1 and any(len(str(n)) != bph * 2 for n in path_nodes_list):
self.logger.warning(
"Path node width mismatch: bytes_per_hop=%d expects %d hex chars per node, nodes=%s",
bph, bph * 2, path_nodes_list[:5]
)
# Log the routing information for analysis
if routing_info['path_length'] > 0:
rf_path_bytes = decoded_packet.get('path_byte_length') or 0
trace_has_route = bool(trace_route_hashes)
trace_has_snr_path = rf_path_bytes > 0
if is_trace and (trace_has_route or trace_has_snr_path):
route_part = (
f"Trace route: {','.join(h.lower() for h in trace_route_hashes)}"
if trace_route_hashes
else "Trace route: (none decoded yet)"
)
snr_part = ""
if trace_snr_db:
snr_fmt = ",".join(f"{v:.2f}" for v in trace_snr_db)
snr_part = f" | Trace SNR (dB): {snr_fmt}"
elif routing_info.get('trace_snr_path_hex'):
snr_part = (
f" | Trace SNR path (raw hex, int8×4 per hop): "
f"{routing_info['trace_snr_path_hex']}"
)
hops_display = len(trace_route_hashes) if trace_route_hashes else decoded_packet.get('path_len', 0)
log_message = (
f"🛣️ ROUTING INFO: {routing_info['route_type']} | {route_part}{snr_part} "
f"({hops_display} route hops, {rf_path_bytes} RF path bytes) | "
f"Payload: {routing_info['payload_length']} bytes | Type: {routing_info['payload_type']}"
)
self.logger.info(log_message)
elif routing_info['path_length'] > 0:
# Use path_nodes when present (multi-byte); else chunk path_hex
path_nodes_list = routing_info.get('path_nodes') or []
if path_nodes_list:
@@ -818,6 +880,10 @@ class MessageHandler:
if (hasattr(self.bot, 'web_viewer_integration') and
self.bot.web_viewer_integration and
self.bot.web_viewer_integration.bot_integration):
decoded_packet['routing_info'] = routing_info
if is_trace and trace_route_hashes:
decoded_packet['path'] = list(trace_route_hashes)
decoded_packet['path_len'] = len(trace_route_hashes)
# Use extracted_payload which is the full MeshCore packet
# (header + path_len + path + payload, without RF wrapper)
decoded_packet['raw_packet_hex'] = extracted_payload if extracted_payload else raw_hex
@@ -1481,6 +1547,8 @@ class MessageHandler:
# Special handling for TRACE packets
if payload_type == PayloadType.TRACE:
# RF path bytes are per-hop SNR×4 (int8), not node hashes. The commanded route is
# in the payload after tag(4)+auth(4)+flags(1); use path_info / parse_trace_payload_route_hashes for display.
# In TRACE packets, path field contains SNR data
# Real routing path is in the payload as pathHashes (after tag(4) + auth(4) + flags(1))
snr_values = []
@@ -21,7 +21,7 @@ from meshcore import EventType
from ..enums import PayloadType, PayloadVersion, RouteType
# Import bot's utilities for packet hash
from ..utils import calculate_packet_hash, decode_path_len_byte
from ..utils import calculate_packet_hash, decode_path_len_byte, parse_trace_payload_route_hashes
# Import MQTT client
try:
@@ -907,7 +907,7 @@ class PacketCaptureService(BaseServicePlugin):
'path_byte_length': path_byte_length,
'bytes_per_hop': bytes_per_hop,
'path_hex': path_hex,
'path': path_nodes, # List of hex node IDs
'path': path_nodes, # For TRACE, RF path is SNR×4 per hop — replaced below
'payload_hex': packet_payload.hex(),
'payload_bytes': len(packet_payload),
'raw_hex': raw_hex,
@@ -916,6 +916,14 @@ class PacketCaptureService(BaseServicePlugin):
'transport_codes': transport_codes
}
# TRACE: RF path bytes are SNR samples; commanded route is in payload[9:]
if payload_type == PayloadType.TRACE:
packet_info['trace_snr_path_hex'] = path_hex.upper()
trace_route = parse_trace_payload_route_hashes(packet_payload)
if trace_route:
packet_info['path'] = trace_route
packet_info['path_len'] = len(trace_route)
return packet_info
except Exception as e:
+33
View File
@@ -416,6 +416,39 @@ def decode_path_len_byte(path_len_byte: int, max_path_size: int = 64) -> tuple:
return (path_byte_length, bytes_per_hop)
def parse_trace_payload_route_hashes(payload: bytes) -> list[str]:
"""Extract TRACE route hash segments from mesh payload (after tag, auth, flags).
Matches MeshCore: ``bytes_per_hash = 1 << (flags & 3)`` for bytes at ``payload[9:]``.
If the tail length is not a multiple of ``bytes_per_hash``, falls back to 1-byte
segments (same as MessageHandler._process_packet_path).
Args:
payload: Full mesh payload bytes (not including header/path).
Returns:
List of uppercase hex strings, one per hop hash.
"""
if len(payload) < 9:
return []
flags = payload[8]
path_hash_len = 1 << (flags & 3)
if path_hash_len <= 0:
path_hash_len = 1
path_hashes_bytes = payload[9:]
if not path_hashes_bytes:
return []
try:
if len(path_hashes_bytes) % path_hash_len == 0:
return [
path_hashes_bytes[i : i + path_hash_len].hex().upper()
for i in range(0, len(path_hashes_bytes), path_hash_len)
]
except Exception:
pass
return [f"{b:02X}" for b in path_hashes_bytes]
def encode_path_len_byte(hop_count: int, bytes_per_hop: int) -> int:
"""Pack hop count and hash size into the single path_len wire byte (inverse of decode_path_len_byte).
+26
View File
@@ -1136,6 +1136,32 @@ class TestDecodeMeshcorePacket:
assert result is not None
assert result["path_info"]["type"] == "trace"
def test_trace_decode_skye_sample_payload_hashes_not_snr_path(self, handler):
# Field report: RF path 31,28,23 was misread as node IDs; payload holds 37,d6,37.
raw = "26033128235F0AED1A000000000037D637"
result = handler.decode_meshcore_packet(raw)
assert result is not None
assert result["payload_type_name"] == "TRACE"
ph = result["path_info"].get("path_hashes") or result["path_info"].get("path")
assert ph == ["37", "D6", "37"]
assert len(result["path_info"]["snr_data"]) == 3
# Top-level path is still SNR chunks (legacy shape); real route is path_info
assert result["path"] # non-empty SNR-as-hex nodes
def test_trace_decode_flags_two_byte_path_hashes(self, handler):
# flags=1 → 2 bytes per hash; tail 4 bytes → AABB, CCDD
trace_payload = (
b"\x01\x00\x00\x00" # tag
+ b"\x00\x00\x00\x00" # auth
+ b"\x01" # flags: path_sz=1 → 2 bytes per hop
+ b"\xaa\xbb\xcc\xdd"
)
hex_str = _make_packet_hex(9, 2, path_bytes=b"", hop_count=0, payload_bytes=trace_payload)
result = handler.decode_meshcore_packet(hex_str)
assert result is not None
ph = result["path_info"].get("path_hashes") or result["path_info"].get("path")
assert ph == ["AABB", "CCDD"]
# ---------------------------------------------------------------------------
# parse_advert
+16
View File
@@ -16,6 +16,7 @@ from modules.utils import (
decode_path_len_byte,
encode_path_len_byte,
extract_path_node_ids_from_message,
parse_trace_payload_route_hashes,
format_elapsed_display,
format_keyword_response_with_placeholders,
format_location_for_display,
@@ -232,6 +233,21 @@ class TestDecodePathLenByte:
assert bytes_per_hop == 1
class TestParseTracePayloadRouteHashes:
"""Tests for parse_trace_payload_route_hashes() (TRACE payload tail, MeshCore firmware)."""
def test_skye_sample_tail(self):
payload = bytes.fromhex("5F0AED1A000000000037D637")
assert parse_trace_payload_route_hashes(payload) == ["37", "D6", "37"]
def test_two_byte_hashes_flags_1(self):
pl = b"\x00" * 8 + b"\x01" + b"\xaa\xbb\xcc\xdd"
assert parse_trace_payload_route_hashes(pl) == ["AABB", "CCDD"]
def test_short_payload_returns_empty(self):
assert parse_trace_payload_route_hashes(b"\x00" * 8) == []
class TestEncodePathLenByte:
"""Tests for encode_path_len_byte() (inverse of decode_path_len_byte for valid encodings)."""
@@ -0,0 +1,19 @@
"""Packet capture TRACE decode: path must come from payload, not RF SNR bytes."""
import logging
from modules.service_plugins.packet_capture_service import PacketCaptureService
def test_decode_packet_trace_uses_payload_route_hashes():
# Avoid full PacketCaptureService.__init__ (config/MQTT); only decode_packet needs logger + debug.
svc = object.__new__(PacketCaptureService)
svc.logger = logging.getLogger("test_packet_capture_trace_decode")
svc.debug = False
raw = "26033128235F0AED1A000000000037D637"
info = PacketCaptureService.decode_packet(svc, raw, {})
assert info is not None
assert info["payload_type"] == "TRACE"
assert info["path"] == ["37", "D6", "37"]
assert info["path_len"] == 3
assert info["trace_snr_path_hex"] == "312823"