Files
meshcore-bot/modules/commands/trace_command.py
agessaman 5a96dec558 Refactor TraceCommand to improve hop labeling logic
- Updated the TraceCommand class to enhance the labeling of hops in the trace output.
- Replaced direct hash retrieval with a conditional approach to determine the source label for each hop, improving clarity in the output format.
- Ensured consistent labeling for the first and last hops, enhancing the readability of the trace results.
2026-03-09 15:56:07 -07:00

247 lines
11 KiB
Python

#!/usr/bin/env python3
"""
Trace and Tracer commands for the MeshCore Bot.
Link diagnostics: trace (manual path when given, reciprocal when no path); tracer (reciprocal so return is heard by bot).
"""
import asyncio
import re
from typing import List, Optional
from .base_command import BaseCommand
from ..models import MeshMessage
from ..trace_runner import run_trace, RunTraceResult
from ..graph_trace_helper import update_mesh_graph_from_trace_data
class TraceCommand(BaseCommand):
"""Trace (manual path) and Tracer (reciprocal path) for link diagnostics."""
name = "trace"
keywords = ["trace", "tracer"]
description = "Run a trace along a path (trace=manual if path given, else round-trip; tracer=always round-trip)"
requires_dm = False
cooldown_seconds = 2
category = "meshcore_info"
short_description = "Run link trace (manual or reciprocal path)"
usage = "trace [path] or tracer [path]"
examples = ["trace 01,7a,55", "tracer", "tracer 01,7a,55"]
def __init__(self, bot):
super().__init__(bot)
self.trace_enabled = self.get_config_value("Trace_Command", "enabled", fallback=True, value_type="bool")
self.maximum_hops = self.bot.config.getint("Trace_Command", "maximum_hops", fallback=5)
self.trace_mode = (self.bot.config.get("Trace_Command", "trace_mode", fallback="one_byte") or "one_byte").strip().lower()
self.timeout_per_hop = self.bot.config.getfloat("Trace_Command", "timeout_per_hop_seconds", fallback=1.5)
self.update_graph_one_byte = self.bot.config.getboolean(
"Trace_Command", "update_graph_one_byte", fallback=True
)
self.update_graph_two_byte = self.bot.config.getboolean(
"Trace_Command", "update_graph_two_byte", fallback=True
)
# Optional: single emoji or string for bot in trace output; unset/empty = "[Bot]"
self.bot_label = (self.bot.config.get("Trace_Command", "bot_label", fallback="") or "").strip()
if not self.bot_label:
self.bot_label = "[Bot]"
output_fmt = (self.bot.config.get("Trace_Command", "output_format", fallback="inline") or "inline").strip().lower()
self.output_format = output_fmt if output_fmt in ("inline", "vertical") else "inline"
def can_execute(self, message: MeshMessage) -> bool:
if not self.trace_enabled:
return False
return super().can_execute(message)
def get_help_text(self) -> str:
return (
"trace [path] — run trace along path (return may not be heard). No path = round-trip like tracer. "
"tracer [path] — round-trip so bot hears return. Path: comma 2-char hex (e.g. 01,7a,55). "
"No path = use your message path (round-trip)."
)
def matches_keyword(self, message: MeshMessage) -> bool:
content = message.content.strip()
if content.startswith("!"):
content = content[1:].strip()
c = content.lower()
if c == "trace" or c == "tracer":
return True
if c.startswith("trace ") or c.startswith("tracer "):
return True
return False
def _extract_path_from_message(self, message: MeshMessage) -> List[str]:
"""Extract path node IDs from message.path (supports 1-hop and multi-hop)."""
if not message.path:
return []
if "Direct" in message.path or "0 hops" in message.path:
return []
path_string = message.path
if " via ROUTE_TYPE_" in path_string:
path_string = path_string.split(" via ROUTE_TYPE_")[0]
if "(" in path_string:
path_string = path_string.split("(")[0].strip()
path_string = path_string.strip()
# Single node (e.g. "01 (1 hop)") has no comma
if "," not in path_string:
if len(path_string) == 2 and all(c in "0123456789abcdefABCDEF" for c in path_string):
return [path_string.lower()]
return []
parts = path_string.split(",")
valid = []
for part in parts:
part = part.strip()
if len(part) == 2 and all(c in "0123456789abcdefABCDEF" for c in part):
valid.append(part.lower())
return valid
def _parse_path_arg(self, content: str) -> Optional[List[str]]:
"""Parse path from command content after 'trace ' or 'tracer '.
Accepts: comma-separated 2-char hex (01,7a,55), contiguous hex (01e07a), or mixed (01,e001).
Returns list of 2-char hex bytes, or None if no path args / invalid.
"""
content = content.strip()
if content.startswith("!"):
content = content[1:].strip()
rest = ""
for kw in ["tracer ", "trace "]:
if content.lower().startswith(kw):
rest = content[len(kw) :].strip()
break
if not rest:
return None
# Normalize: drop commas and spaces, single string of hex chars
hex_chars = re.sub(r"[\s,]+", "", rest).lower()
if not hex_chars or len(hex_chars) % 2 != 0:
return None
if not all(c in "0123456789abcdef" for c in hex_chars):
return None
# Split into 2-char (1-byte) nodes
return [hex_chars[i : i + 2] for i in range(0, len(hex_chars), 2)]
def _build_reciprocal_path(self, nodes: List[str]) -> List[str]:
"""Build round-trip path: [01,7a,55] -> [01,7a,55,7a,01] (out then back without duplicating destination)."""
if not nodes or len(nodes) < 2:
return list(nodes)
# Return path: reverse of path excluding last node (so we don't duplicate the far end)
return nodes + list(reversed(nodes[:-1]))
def _format_trace_result(self, message: MeshMessage, result: RunTraceResult) -> str:
"""Format trace result as inline (default) or vertical per output_format config."""
sender_str = f"@[{message.sender_id}] "
if not result.success:
return f"{sender_str}Trace failed: {result.error_message or 'unknown'}"
if self.output_format == "vertical":
return self._format_trace_vertical(sender_str, result)
return self._format_trace_inline(sender_str, result)
def _format_trace_inline(self, sender_str: str, result: RunTraceResult) -> str:
"""Single chain: bot_label SNR [node] ... SNR bot_label."""
parts = [self.bot_label]
for node in result.path_nodes:
s = node.get("snr")
h = node.get("hash")
if h:
node_str = f"[{h}]"
else:
node_str = self.bot_label
if s is not None:
parts.append(f"{s:.1f}")
parts.append(node_str)
return sender_str + " ".join(parts)
def _format_trace_vertical(self, sender_str: str, result: RunTraceResult) -> str:
"""Vertical format: one line per hop, 'from → snr →' (next line's from); 'db' only on first SNR."""
lines = [f"{sender_str}Trace:"]
nodes = result.path_nodes
for i, node in enumerate(nodes):
s = node.get("snr")
# Left label = source of this hop: bot for first hop, else previous node's hash
if i == 0:
from_label = self.bot_label
else:
prev_h = nodes[i - 1].get("hash")
from_label = prev_h if prev_h else ""
if i == 0:
# First hop: bot → SNR db →
snr_str = f"{s:.1f} db" if s is not None else ""
lines.append(f"{from_label}{snr_str}")
elif i == len(nodes) - 1:
# Last hop: from_node → SNR → bot
snr_str = f"{s:.1f}" if s is not None else ""
lines.append(f"{from_label}{snr_str}{self.bot_label}")
else:
snr_str = f"{s:.1f}" if s is not None else ""
lines.append(f"{from_label}{snr_str}")
return "\n".join(lines)
async def execute(self, message: MeshMessage) -> bool:
content = message.content.strip()
if content.startswith("!"):
content = content[1:].strip()
is_tracer = content.lower().startswith("tracer")
path_arg = self._parse_path_arg(message.content)
if path_arg is not None:
path_nodes = path_arg[: self.maximum_hops]
else:
path_nodes = self._extract_path_from_message(message)
if not path_nodes:
await self.send_response(
message,
"Trace/tracer need a path (e.g. trace 01,7a,55) or a message that has a path.",
)
return True
path_nodes = path_nodes[: self.maximum_hops]
# Incoming path is sender → ... → bot; reverse so we have path from bot toward sender
path_nodes = list(reversed(path_nodes))
# Use reciprocal path for tracer (always) or when no path given (so round-trip completes)
if is_tracer or path_arg is None:
path_nodes = self._build_reciprocal_path(path_nodes)
# Do not cap here: outbound was already capped above; truncating would drop the return path
if not self.bot.connected or not self.bot.meshcore:
await self.send_response(message, "Not connected to radio.")
return True
if not getattr(self.bot.meshcore, "commands", None) or not hasattr(
self.bot.meshcore.commands, "send_trace"
):
await self.send_response(message, "Trace not available (firmware/connection).")
return True
flags = 0
if self.trace_mode == "two_byte":
flags = 1 # Reserve flag for 2-byte when supported
result = await run_trace(
self.bot,
path=path_nodes,
flags=flags,
timeout_seconds=None,
)
response = self._format_trace_result(message, result)
sent = await self.send_response(message, response)
# If rate limited (e.g. after retry), wait and retry once so the user gets the result
if not sent and hasattr(self.bot, "command_manager") and self.bot.command_manager:
wait_s = self.bot.command_manager.get_rate_limit_wait_seconds(
self.bot.command_manager.get_rate_limit_key(message)
)
if wait_s > 0.1:
await asyncio.sleep(wait_s)
await self.send_response(message, response)
if result.success and self.update_graph_one_byte and result.path_nodes:
path_hashes = [n.get("hash") for n in result.path_nodes if n.get("hash")]
if path_hashes and hasattr(self.bot, "mesh_graph") and self.bot.mesh_graph and self.bot.mesh_graph.capture_enabled:
update_mesh_graph_from_trace_data(
self.bot,
path_hashes,
{},
is_our_trace=True,
)
return True