mirror of
https://github.com/agessaman/meshcore-bot.git
synced 2026-03-30 20:15:40 +00:00
- Updated the TraceCommand class to enhance the labeling of hops in the trace output. - Replaced direct hash retrieval with a conditional approach to determine the source label for each hop, improving clarity in the output format. - Ensured consistent labeling for the first and last hops, enhancing the readability of the trace results.
247 lines
11 KiB
Python
247 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Trace and Tracer commands for the MeshCore Bot.
|
|
Link diagnostics: trace (manual path when given, reciprocal when no path); tracer (reciprocal so return is heard by bot).
|
|
"""
|
|
|
|
import asyncio
|
|
import re
|
|
from typing import List, Optional
|
|
|
|
from .base_command import BaseCommand
|
|
from ..models import MeshMessage
|
|
from ..trace_runner import run_trace, RunTraceResult
|
|
from ..graph_trace_helper import update_mesh_graph_from_trace_data
|
|
|
|
|
|
class TraceCommand(BaseCommand):
|
|
"""Trace (manual path) and Tracer (reciprocal path) for link diagnostics."""
|
|
|
|
name = "trace"
|
|
keywords = ["trace", "tracer"]
|
|
description = "Run a trace along a path (trace=manual if path given, else round-trip; tracer=always round-trip)"
|
|
requires_dm = False
|
|
cooldown_seconds = 2
|
|
category = "meshcore_info"
|
|
|
|
short_description = "Run link trace (manual or reciprocal path)"
|
|
usage = "trace [path] or tracer [path]"
|
|
examples = ["trace 01,7a,55", "tracer", "tracer 01,7a,55"]
|
|
|
|
def __init__(self, bot):
|
|
super().__init__(bot)
|
|
self.trace_enabled = self.get_config_value("Trace_Command", "enabled", fallback=True, value_type="bool")
|
|
self.maximum_hops = self.bot.config.getint("Trace_Command", "maximum_hops", fallback=5)
|
|
self.trace_mode = (self.bot.config.get("Trace_Command", "trace_mode", fallback="one_byte") or "one_byte").strip().lower()
|
|
self.timeout_per_hop = self.bot.config.getfloat("Trace_Command", "timeout_per_hop_seconds", fallback=1.5)
|
|
self.update_graph_one_byte = self.bot.config.getboolean(
|
|
"Trace_Command", "update_graph_one_byte", fallback=True
|
|
)
|
|
self.update_graph_two_byte = self.bot.config.getboolean(
|
|
"Trace_Command", "update_graph_two_byte", fallback=True
|
|
)
|
|
# Optional: single emoji or string for bot in trace output; unset/empty = "[Bot]"
|
|
self.bot_label = (self.bot.config.get("Trace_Command", "bot_label", fallback="") or "").strip()
|
|
if not self.bot_label:
|
|
self.bot_label = "[Bot]"
|
|
output_fmt = (self.bot.config.get("Trace_Command", "output_format", fallback="inline") or "inline").strip().lower()
|
|
self.output_format = output_fmt if output_fmt in ("inline", "vertical") else "inline"
|
|
|
|
def can_execute(self, message: MeshMessage) -> bool:
|
|
if not self.trace_enabled:
|
|
return False
|
|
return super().can_execute(message)
|
|
|
|
def get_help_text(self) -> str:
|
|
return (
|
|
"trace [path] — run trace along path (return may not be heard). No path = round-trip like tracer. "
|
|
"tracer [path] — round-trip so bot hears return. Path: comma 2-char hex (e.g. 01,7a,55). "
|
|
"No path = use your message path (round-trip)."
|
|
)
|
|
|
|
def matches_keyword(self, message: MeshMessage) -> bool:
|
|
content = message.content.strip()
|
|
if content.startswith("!"):
|
|
content = content[1:].strip()
|
|
c = content.lower()
|
|
if c == "trace" or c == "tracer":
|
|
return True
|
|
if c.startswith("trace ") or c.startswith("tracer "):
|
|
return True
|
|
return False
|
|
|
|
def _extract_path_from_message(self, message: MeshMessage) -> List[str]:
|
|
"""Extract path node IDs from message.path (supports 1-hop and multi-hop)."""
|
|
if not message.path:
|
|
return []
|
|
if "Direct" in message.path or "0 hops" in message.path:
|
|
return []
|
|
path_string = message.path
|
|
if " via ROUTE_TYPE_" in path_string:
|
|
path_string = path_string.split(" via ROUTE_TYPE_")[0]
|
|
if "(" in path_string:
|
|
path_string = path_string.split("(")[0].strip()
|
|
path_string = path_string.strip()
|
|
# Single node (e.g. "01 (1 hop)") has no comma
|
|
if "," not in path_string:
|
|
if len(path_string) == 2 and all(c in "0123456789abcdefABCDEF" for c in path_string):
|
|
return [path_string.lower()]
|
|
return []
|
|
parts = path_string.split(",")
|
|
valid = []
|
|
for part in parts:
|
|
part = part.strip()
|
|
if len(part) == 2 and all(c in "0123456789abcdefABCDEF" for c in part):
|
|
valid.append(part.lower())
|
|
return valid
|
|
|
|
def _parse_path_arg(self, content: str) -> Optional[List[str]]:
|
|
"""Parse path from command content after 'trace ' or 'tracer '.
|
|
Accepts: comma-separated 2-char hex (01,7a,55), contiguous hex (01e07a), or mixed (01,e001).
|
|
Returns list of 2-char hex bytes, or None if no path args / invalid.
|
|
"""
|
|
content = content.strip()
|
|
if content.startswith("!"):
|
|
content = content[1:].strip()
|
|
rest = ""
|
|
for kw in ["tracer ", "trace "]:
|
|
if content.lower().startswith(kw):
|
|
rest = content[len(kw) :].strip()
|
|
break
|
|
if not rest:
|
|
return None
|
|
# Normalize: drop commas and spaces, single string of hex chars
|
|
hex_chars = re.sub(r"[\s,]+", "", rest).lower()
|
|
if not hex_chars or len(hex_chars) % 2 != 0:
|
|
return None
|
|
if not all(c in "0123456789abcdef" for c in hex_chars):
|
|
return None
|
|
# Split into 2-char (1-byte) nodes
|
|
return [hex_chars[i : i + 2] for i in range(0, len(hex_chars), 2)]
|
|
|
|
def _build_reciprocal_path(self, nodes: List[str]) -> List[str]:
|
|
"""Build round-trip path: [01,7a,55] -> [01,7a,55,7a,01] (out then back without duplicating destination)."""
|
|
if not nodes or len(nodes) < 2:
|
|
return list(nodes)
|
|
# Return path: reverse of path excluding last node (so we don't duplicate the far end)
|
|
return nodes + list(reversed(nodes[:-1]))
|
|
|
|
def _format_trace_result(self, message: MeshMessage, result: RunTraceResult) -> str:
|
|
"""Format trace result as inline (default) or vertical per output_format config."""
|
|
sender_str = f"@[{message.sender_id}] "
|
|
if not result.success:
|
|
return f"{sender_str}Trace failed: {result.error_message or 'unknown'}"
|
|
if self.output_format == "vertical":
|
|
return self._format_trace_vertical(sender_str, result)
|
|
return self._format_trace_inline(sender_str, result)
|
|
|
|
def _format_trace_inline(self, sender_str: str, result: RunTraceResult) -> str:
|
|
"""Single chain: bot_label SNR [node] ... SNR bot_label."""
|
|
parts = [self.bot_label]
|
|
for node in result.path_nodes:
|
|
s = node.get("snr")
|
|
h = node.get("hash")
|
|
if h:
|
|
node_str = f"[{h}]"
|
|
else:
|
|
node_str = self.bot_label
|
|
if s is not None:
|
|
parts.append(f"{s:.1f}")
|
|
parts.append(node_str)
|
|
return sender_str + " ".join(parts)
|
|
|
|
def _format_trace_vertical(self, sender_str: str, result: RunTraceResult) -> str:
|
|
"""Vertical format: one line per hop, 'from → snr →' (next line's from); 'db' only on first SNR."""
|
|
lines = [f"{sender_str}Trace:"]
|
|
nodes = result.path_nodes
|
|
for i, node in enumerate(nodes):
|
|
s = node.get("snr")
|
|
# Left label = source of this hop: bot for first hop, else previous node's hash
|
|
if i == 0:
|
|
from_label = self.bot_label
|
|
else:
|
|
prev_h = nodes[i - 1].get("hash")
|
|
from_label = prev_h if prev_h else "—"
|
|
if i == 0:
|
|
# First hop: bot → SNR db →
|
|
snr_str = f"{s:.1f} db" if s is not None else "—"
|
|
lines.append(f"{from_label} → {snr_str} →")
|
|
elif i == len(nodes) - 1:
|
|
# Last hop: from_node → SNR → bot
|
|
snr_str = f"{s:.1f}" if s is not None else "—"
|
|
lines.append(f"{from_label} → {snr_str} → {self.bot_label}")
|
|
else:
|
|
snr_str = f"{s:.1f}" if s is not None else "—"
|
|
lines.append(f"{from_label} → {snr_str} →")
|
|
return "\n".join(lines)
|
|
|
|
async def execute(self, message: MeshMessage) -> bool:
|
|
content = message.content.strip()
|
|
if content.startswith("!"):
|
|
content = content[1:].strip()
|
|
|
|
is_tracer = content.lower().startswith("tracer")
|
|
path_arg = self._parse_path_arg(message.content)
|
|
if path_arg is not None:
|
|
path_nodes = path_arg[: self.maximum_hops]
|
|
else:
|
|
path_nodes = self._extract_path_from_message(message)
|
|
if not path_nodes:
|
|
await self.send_response(
|
|
message,
|
|
"Trace/tracer need a path (e.g. trace 01,7a,55) or a message that has a path.",
|
|
)
|
|
return True
|
|
path_nodes = path_nodes[: self.maximum_hops]
|
|
# Incoming path is sender → ... → bot; reverse so we have path from bot toward sender
|
|
path_nodes = list(reversed(path_nodes))
|
|
|
|
# Use reciprocal path for tracer (always) or when no path given (so round-trip completes)
|
|
if is_tracer or path_arg is None:
|
|
path_nodes = self._build_reciprocal_path(path_nodes)
|
|
# Do not cap here: outbound was already capped above; truncating would drop the return path
|
|
|
|
if not self.bot.connected or not self.bot.meshcore:
|
|
await self.send_response(message, "Not connected to radio.")
|
|
return True
|
|
|
|
if not getattr(self.bot.meshcore, "commands", None) or not hasattr(
|
|
self.bot.meshcore.commands, "send_trace"
|
|
):
|
|
await self.send_response(message, "Trace not available (firmware/connection).")
|
|
return True
|
|
|
|
flags = 0
|
|
if self.trace_mode == "two_byte":
|
|
flags = 1 # Reserve flag for 2-byte when supported
|
|
|
|
result = await run_trace(
|
|
self.bot,
|
|
path=path_nodes,
|
|
flags=flags,
|
|
timeout_seconds=None,
|
|
)
|
|
|
|
response = self._format_trace_result(message, result)
|
|
sent = await self.send_response(message, response)
|
|
# If rate limited (e.g. after retry), wait and retry once so the user gets the result
|
|
if not sent and hasattr(self.bot, "command_manager") and self.bot.command_manager:
|
|
wait_s = self.bot.command_manager.get_rate_limit_wait_seconds(
|
|
self.bot.command_manager.get_rate_limit_key(message)
|
|
)
|
|
if wait_s > 0.1:
|
|
await asyncio.sleep(wait_s)
|
|
await self.send_response(message, response)
|
|
|
|
if result.success and self.update_graph_one_byte and result.path_nodes:
|
|
path_hashes = [n.get("hash") for n in result.path_nodes if n.get("hash")]
|
|
if path_hashes and hasattr(self.bot, "mesh_graph") and self.bot.mesh_graph and self.bot.mesh_graph.capture_enabled:
|
|
update_mesh_graph_from_trace_data(
|
|
self.bot,
|
|
path_hashes,
|
|
{},
|
|
is_our_trace=True,
|
|
)
|
|
|
|
return True
|