Files
meshcore-bot/modules/trace_runner.py
Stacy Olivas ce884cee87 fix: auth, db migrations, retry, chunking, socket race, trace, timezone, repeater, and ruff/mypy cleanup
BUG-001: web viewer login/session auth (in web viewer commit)
BUG-002: db_manager ALTER TABLE for missing channel_operations and
  feed_message_queue columns on startup
BUG-015: scheduler thread blocked on future.result(); replaced all
  blocking waits with add_done_callback (fire-and-forget)
BUG-016: reboot_radio sends meshcore.commands.reboot() before disconnect
BUG-017: radio disconnect uses asyncio.wait_for(timeout=10)
BUG-022: custom asyncio loop exception handler suppresses IndexError
  from meshcore parser at DEBUG level
BUG-024: last_db_backup_run updated after each run; 2-min startup
  window; last-run seeded from DB on restart
BUG-025: send_channel_message retries up to 2 times (2s delay) on
  no_event_received via _is_no_event_received() helper
BUG-026: split_text_into_chunks() and get_max_message_length() added
  to CommandManager; keyword dispatch uses send_response_chunked()
BUG-028: byte_data = b"" initialised before try block in
  decode_meshcore_packet to prevent UnboundLocalError in except handler
TraceCommand: path nodes reversed and return path truncated; fixed
format_elapsed_display: UTC normalisation before elapsed computation (#75)
RepeaterManager: auto_manage_contacts guard before any purge logic (#50)
Command aliases: [Aliases] config section injects shorthands at startup
JSON logging: _JsonFormatter; json_logging = true in [Logging]
Structured JSON logging compatible with Loki, Elasticsearch, Splunk
Discord bridge, Telegram bridge, and all service plugins updated
MeshGraph edge promotion logic corrected
Shutdown: scheduler and meshcore disconnect joined cleanly; log spam fixed
All modules: ruff and mypy cleanup applied (type annotations, imports)
2026-03-17 18:07:18 -07:00

170 lines
5.5 KiB
Python

#!/usr/bin/env python3
"""
Trace runner for MeshCore Bot.
Runs send_trace via MeshCore_py, waits for TRACE_DATA, and returns a structured result.
Supports configurable retries with delay between attempts. Shared by the trace command
and future automated mesh tracing service.
"""
import asyncio
import random
from dataclasses import dataclass, field
from typing import Any, Optional
from meshcore import EventType
@dataclass
class RunTraceResult:
"""Result of running a trace."""
success: bool
tag: int
path_nodes: list[dict[str, Any]] = field(default_factory=list)
path_len: int = 0
flags: int = 0
error_message: Optional[str] = None
def _get_timeout_seconds(bot: Any, path: Optional[list[str]]) -> float:
"""Compute total timeout from path length and config."""
per_hop = bot.config.getfloat("Trace_Command", "timeout_per_hop_seconds", fallback=0.5)
base = bot.config.getfloat("Trace_Command", "timeout_base_seconds", fallback=1.0)
hops = len(path) if path else 0
# Typical: ~1s for 6 hops, ~2s for 10 hops; base + per-hop gives margin
total = base + max(1, hops) * per_hop
return total
async def _run_trace_attempt(
bot: Any,
path: Optional[list[str]],
path_string: Optional[str],
flags: int,
timeout_seconds: float,
tag: int,
) -> RunTraceResult:
"""Execute one trace attempt: send_trace then wait for TRACE_DATA. Caller handles retries."""
try:
if hasattr(bot, "transmission_tracker") and bot.transmission_tracker:
bot.transmission_tracker.record_transmission(
content="trace",
target="",
message_type="trace",
command_id=str(tag),
)
except Exception as e:
bot.logger.debug(f"Trace runner: failed to record transmission: {e}")
try:
result = await bot.meshcore.commands.send_trace(
auth_code=0,
tag=tag,
flags=flags,
path=path_string,
)
except Exception as e:
return RunTraceResult(success=False, tag=tag, error_message=str(e))
if result.type == EventType.ERROR:
reason = result.payload.get("reason", "unknown error")
return RunTraceResult(success=False, tag=tag, error_message=reason)
path_str = ",".join(path) if path else "(flood)"
try:
event = await bot.meshcore.wait_for_event(
EventType.TRACE_DATA,
attribute_filters={"tag": tag},
timeout=timeout_seconds,
)
except Exception as e:
return RunTraceResult(
success=False,
tag=tag,
error_message=f"Timeout or error waiting for trace (path: {path_str}): {e}",
)
if not event:
return RunTraceResult(
success=False,
tag=tag,
error_message=f"No trace response within timeout (path: {path_str})",
)
payload = event.payload or {}
path_nodes = payload.get("path") or []
path_len = payload.get("path_len", 0)
flags_val = payload.get("flags", 0)
return RunTraceResult(
success=True,
tag=tag,
path_nodes=path_nodes,
path_len=path_len,
flags=flags_val,
)
async def run_trace(
bot: Any,
path: Optional[list[str]] = None,
flags: int = 0,
timeout_seconds: Optional[float] = None,
) -> RunTraceResult:
"""
Send a trace and wait for TRACE_DATA. Retries on failure per config (default 2 attempts, 1s delay).
Args:
bot: MeshCoreBot instance (must have meshcore, config, transmission_tracker).
path: Optional list of 2-char hex node IDs (e.g. ["01", "7a", "55"]). None = flood.
flags: 8-bit flags for send_trace (0 = one_byte default).
timeout_seconds: Override; if None, uses base + (path hops * per_hop) from config.
Returns:
RunTraceResult with success, tag, path_nodes (hash/snr), path_len, flags, error_message.
"""
if not bot.meshcore or not getattr(bot.meshcore, "commands", None):
return RunTraceResult(
success=False,
tag=0,
error_message="Not connected or send_trace not available",
)
path_string = None
if path:
path_string = ",".join(p.strip().lower() for p in path if p and len(p.strip()) >= 2)
if timeout_seconds is None:
timeout_seconds = _get_timeout_seconds(bot, path)
max_attempts = max(1, bot.config.getint("Trace_Command", "trace_retry_count", fallback=2))
retry_delay = max(0.0, bot.config.getfloat("Trace_Command", "trace_retry_delay_seconds", fallback=1.0))
path_str_debug = path_string if path_string else "(flood)"
last_result: Optional[RunTraceResult] = None
for attempt in range(max_attempts):
tag = random.randint(1, 0xFFFFFFFF)
if attempt > 0:
bot.logger.debug("Trace retry %s/%s after %.1fs delay", attempt + 1, max_attempts, retry_delay)
await asyncio.sleep(retry_delay)
bot.logger.debug(
"Trace: path=%s hops=%s timeout=%.1fs tag=%s attempt=%s/%s",
path_str_debug,
len(path) if path else 0,
timeout_seconds,
tag,
attempt + 1,
max_attempts,
)
last_result = await _run_trace_attempt(
bot, path, path_string, flags, timeout_seconds, tag
)
if last_result.success:
return last_result
return last_result or RunTraceResult(
success=False,
tag=0,
error_message="Trace failed (no attempts run)",
)