Files
meshcore-bot/modules/trace_runner.py
agessaman 1c204dc389 Add trace command, configuration and documentation
- Introduced `[Trace_Command]` section in `config.ini.example` to enable link diagnostics with customizable parameters such as maximum hops, trace mode, and retry settings.
- Updated `command-reference.md` to include usage instructions and examples for the new `trace` and `tracer` commands, detailing their functionality for manual and round-trip tracing.
- Enhanced `mesh_graph.py` to support 2-byte trace confirmation for improved path weighting.
- Modified `message_handler.py` to update the mesh graph based on trace data, ensuring accurate edge representation in the graph.
- Added translations for the new trace commands in `en.json` to support user interaction in multiple languages.
2026-02-23 14:26:50 -08:00

170 lines
5.5 KiB
Python

#!/usr/bin/env python3
"""
Trace runner for MeshCore Bot.
Runs send_trace via MeshCore_py, waits for TRACE_DATA, and returns a structured result.
Supports configurable retries with delay between attempts. Shared by the trace command
and future automated mesh tracing service.
"""
import asyncio
import random
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from meshcore import EventType
@dataclass
class RunTraceResult:
"""Result of running a trace."""
success: bool
tag: int
path_nodes: List[Dict[str, Any]] = field(default_factory=list)
path_len: int = 0
flags: int = 0
error_message: Optional[str] = None
def _get_timeout_seconds(bot: Any, path: Optional[List[str]]) -> float:
"""Compute total timeout from path length and config."""
per_hop = bot.config.getfloat("Trace_Command", "timeout_per_hop_seconds", fallback=0.5)
base = bot.config.getfloat("Trace_Command", "timeout_base_seconds", fallback=1.0)
hops = len(path) if path else 0
# Typical: ~1s for 6 hops, ~2s for 10 hops; base + per-hop gives margin
total = base + max(1, hops) * per_hop
return total
async def _run_trace_attempt(
bot: Any,
path: Optional[List[str]],
path_string: Optional[str],
flags: int,
timeout_seconds: float,
tag: int,
) -> RunTraceResult:
"""Execute one trace attempt: send_trace then wait for TRACE_DATA. Caller handles retries."""
try:
if hasattr(bot, "transmission_tracker") and bot.transmission_tracker:
bot.transmission_tracker.record_transmission(
content="trace",
target="",
message_type="trace",
command_id=str(tag),
)
except Exception as e:
bot.logger.debug(f"Trace runner: failed to record transmission: {e}")
try:
result = await bot.meshcore.commands.send_trace(
auth_code=0,
tag=tag,
flags=flags,
path=path_string,
)
except Exception as e:
return RunTraceResult(success=False, tag=tag, error_message=str(e))
if result.type == EventType.ERROR:
reason = result.payload.get("reason", "unknown error")
return RunTraceResult(success=False, tag=tag, error_message=reason)
path_str = ",".join(path) if path else "(flood)"
try:
event = await bot.meshcore.wait_for_event(
EventType.TRACE_DATA,
attribute_filters={"tag": tag},
timeout=timeout_seconds,
)
except Exception as e:
return RunTraceResult(
success=False,
tag=tag,
error_message=f"Timeout or error waiting for trace (path: {path_str}): {e}",
)
if not event:
return RunTraceResult(
success=False,
tag=tag,
error_message=f"No trace response within timeout (path: {path_str})",
)
payload = event.payload or {}
path_nodes = payload.get("path") or []
path_len = payload.get("path_len", 0)
flags_val = payload.get("flags", 0)
return RunTraceResult(
success=True,
tag=tag,
path_nodes=path_nodes,
path_len=path_len,
flags=flags_val,
)
async def run_trace(
bot: Any,
path: Optional[List[str]] = None,
flags: int = 0,
timeout_seconds: Optional[float] = None,
) -> RunTraceResult:
"""
Send a trace and wait for TRACE_DATA. Retries on failure per config (default 2 attempts, 1s delay).
Args:
bot: MeshCoreBot instance (must have meshcore, config, transmission_tracker).
path: Optional list of 2-char hex node IDs (e.g. ["01", "7a", "55"]). None = flood.
flags: 8-bit flags for send_trace (0 = one_byte default).
timeout_seconds: Override; if None, uses base + (path hops * per_hop) from config.
Returns:
RunTraceResult with success, tag, path_nodes (hash/snr), path_len, flags, error_message.
"""
if not bot.meshcore or not getattr(bot.meshcore, "commands", None):
return RunTraceResult(
success=False,
tag=0,
error_message="Not connected or send_trace not available",
)
path_string = None
if path:
path_string = ",".join(p.strip().lower() for p in path if p and len(p.strip()) >= 2)
if timeout_seconds is None:
timeout_seconds = _get_timeout_seconds(bot, path)
max_attempts = max(1, bot.config.getint("Trace_Command", "trace_retry_count", fallback=2))
retry_delay = max(0.0, bot.config.getfloat("Trace_Command", "trace_retry_delay_seconds", fallback=1.0))
path_str_debug = path_string if path_string else "(flood)"
last_result: Optional[RunTraceResult] = None
for attempt in range(max_attempts):
tag = random.randint(1, 0xFFFFFFFF)
if attempt > 0:
bot.logger.debug("Trace retry %s/%s after %.1fs delay", attempt + 1, max_attempts, retry_delay)
await asyncio.sleep(retry_delay)
bot.logger.debug(
"Trace: path=%s hops=%s timeout=%.1fs tag=%s attempt=%s/%s",
path_str_debug,
len(path) if path else 0,
timeout_seconds,
tag,
attempt + 1,
max_attempts,
)
last_result = await _run_trace_attempt(
bot, path, path_string, flags, timeout_seconds, tag
)
if last_result.success:
return last_result
return last_result or RunTraceResult(
success=False,
tag=0,
error_message="Trace failed (no attempts run)",
)