Files
meshcore-bot/modules/commands/multitest_command.py
agessaman 43f450e702 Update path formatting and message handling in commands
- Modified path formatting in `multitest_command.py` to use ideographic space for nested branches, enhancing visual clarity.
- Updated the `schedule_command.py` to strip control characters from messages, ensuring safe and clean previews.
- Adjusted test cases in `test_multitest_command.py` to reflect changes in path formatting, improving consistency across tests.

These changes enhance the readability of path outputs and improve message handling in scheduled commands.
2026-03-29 20:27:18 -07:00

773 lines
35 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Multitest command for the MeshCore Bot
Listens for a period of time and collects all unique paths from incoming messages
"""
import asyncio
import re
import time
from collections import defaultdict
from dataclasses import dataclass
from typing import Optional
from ..models import MeshMessage
from ..utils import calculate_packet_hash, parse_path_string
from .base_command import BaseCommand
_BRANCH_INTER = "\u251c" # ├ (intermediate branch)
_BRANCH_LAST = "\u2514" # └ (last branch)
_INDENT_NEST = "\u3000" #   ideographic space before nested ├/└
_BRANCH_CORNER = "\u2510" # ┐ (marks end of common path before branches)
def _tree_branch_lines_flat(suffixes: list[str]) -> list[str]:
"""Format branch rows: ├ for all but the last, └ for the last; space after the tee only."""
if not suffixes:
return []
n = len(suffixes)
out: list[str] = []
for i, text in enumerate(suffixes):
prefix = _BRANCH_LAST if i == n - 1 else _BRANCH_INTER
out.append(f"{prefix} {text}")
return out
def _grouped_suffix_line_specs(non_empty: list[list[str]]) -> list[tuple[str, str]]:
"""Build (line_kind, text) rows: group by first hop, then longest in-group prefix on the head line.
So paths 96,e0 / 96,e0,01 / … share head '96,e0' and nest 01, … instead of head '96' with
misleading 'e0' as if it were the only endpoint under that branch.
"""
by_first: dict[str, list[list[str]]] = defaultdict(list)
for suf in non_empty:
by_first[suf[0]].append(suf[1:])
specs: list[tuple[str, str]] = []
for ft in sorted(by_first.keys()):
rests = by_first[ft]
full_sufs = [[ft, *r] for r in rests]
if len(full_sufs) == 1:
specs.append(("head", ",".join(full_sufs[0])))
continue
inner_lcp = _longest_common_prefix(full_sufs)
head_text = ",".join(inner_lcp)
remainders = [s[len(inner_lcp) :] for s in full_sufs]
nested = sorted((r for r in remainders if r), key=lambda x: ",".join(x))
if not nested:
specs.append(("head", head_text))
continue
specs.append(("head", head_text))
for rem in nested:
specs.append(("nest", ",".join(rem)))
return specs
def _apply_tee_prefixes(specs: list[tuple[str, str]]) -> list[str]:
"""Assign ├/└ from flattened order; nested rows use   before ├/└. Only the final row uses └."""
n = len(specs)
out: list[str] = []
for i, (kind, text) in enumerate(specs):
last = i == n - 1
if kind == "head":
p = _BRANCH_LAST if last else _BRANCH_INTER
out.append(f"{p} {text}")
else:
p = _BRANCH_LAST if last else _BRANCH_INTER
out.append(f"{_INDENT_NEST}{p} {text}")
return out
def _format_suffix_branch_lines(suffix_tokens: list[list[str]], short_ellipsis: bool) -> list[str]:
"""Format suffixes after display LCP: group by first hop, in-group LCP on head, nest tails indented with U+3000."""
non_empty = [s for s in suffix_tokens if s]
if not non_empty:
return _tree_branch_lines_flat(["..."]) if short_ellipsis else []
if len(non_empty) == 1:
parts = [",".join(non_empty[0])]
if short_ellipsis:
parts.append("...")
return _tree_branch_lines_flat(parts)
specs = _grouped_suffix_line_specs(non_empty)
if short_ellipsis:
specs.append(("head", "..."))
return _apply_tee_prefixes(specs)
def _path_to_tokens(path: str) -> list[str]:
"""Split a comma-separated path into non-empty hex token strings."""
return [p.strip() for p in path.split(",") if p.strip()]
def _is_strict_prefix(a: list[str], b: list[str]) -> bool:
return len(a) < len(b) and b[: len(a)] == a
def _is_truncated_prefix_path(t: list[str], all_paths: list[list[str]]) -> bool:
"""True if t is a strict prefix of some path with more than one extra hop (incomplete trace)."""
for u in all_paths:
if t == u:
continue
if _is_strict_prefix(t, u) and (len(u) - len(t)) > 1:
return True
return False
def _longest_common_prefix(token_lists: list[list[str]]) -> list[str]:
if not token_lists:
return []
if len(token_lists) == 1:
return list(token_lists[0])
first = token_lists[0]
for i, tok in enumerate(first):
for tl in token_lists[1:]:
if i >= len(tl) or tl[i] != tok:
return first[:i]
return list(first[: min(len(tl) for tl in token_lists)])
def _shrink_display_lcp(maximal: list[list[str]], lcp: list[str]) -> list[str]:
"""Shorten displayed LCP when one path ends exactly at LCP and another continues past it.
Avoids showing the shorter path as the 'trunk' with only the tail as a branch (e.g. …0101
on the common line and └ 0970), which reads like a single endpoint plus an offshoot.
"""
lcp = list(lcp)
while len(lcp) > 1:
has_exact = any(t == lcp for t in maximal)
has_extend = any(_is_strict_prefix(lcp, t) for t in maximal)
if has_exact and has_extend:
lcp.pop()
else:
break
return lcp
def _format_path_cluster(token_lists: list[list[str]], use_brackets: bool) -> list[str]:
"""Format a cluster into condensed lines (common prefix + ┐ + ├/└, nested tails indented with  ).
The shared path line ends with ┐ (U+2510) when branch lines follow. Suffixes are grouped by
their first hop after the display LCP; within a group, the longest common prefix of all
suffixes in that group is one ├ line, then  ├/ └ for each distinct tail. The last line of the
block uses └ at top level or  └ when nested.
If one path stops exactly where another continues, the displayed LCP is shortened so the shared
segment is not mistaken for a single endpoint (e.g. only └ tail after a full shorter path).
"""
token_lists = [t for t in token_lists if t]
if not token_lists:
return []
if len(token_lists) == 1:
s = ",".join(token_lists[0])
return [f"[{s}]"] if use_brackets else [s]
maximal = [t for t in token_lists if not _is_truncated_prefix_path(t, token_lists)]
short_ellipsis = len(maximal) < len(token_lists)
if not maximal:
return _tree_branch_lines_flat(["..."]) if short_ellipsis else []
raw_lcp = _longest_common_prefix(maximal)
lcp = _shrink_display_lcp(maximal, raw_lcp)
if len(lcp) > 0:
suffix_tokens = [t[len(lcp) :] for t in maximal]
common = ",".join(lcp)
branch_lines = _format_suffix_branch_lines(suffix_tokens, short_ellipsis)
if branch_lines:
lines = [f"{common} {_BRANCH_CORNER}"]
lines.extend(branch_lines)
else:
lines = [common]
return lines
if len(maximal) == 1:
s = ",".join(maximal[0])
lines = [f"[{s}]"] if use_brackets else [s]
if short_ellipsis:
lines.extend(_tree_branch_lines_flat(["..."]))
return lines
groups: dict[str, list[list[str]]] = {}
for t in maximal:
groups.setdefault(t[0], []).append(t)
lines: list[str] = []
multi = len(groups) > 1
for ft in sorted(groups.keys()):
sub_lines = _format_path_cluster(groups[ft], use_brackets=multi)
lines.extend(sub_lines)
if short_ellipsis:
lines.extend(_tree_branch_lines_flat(["..."]))
return lines
def _condense_path_lines(paths: list[str]) -> str:
"""Condense sorted unique path strings by shared prefix and branch suffixes."""
if len(paths) <= 1:
return "\n".join(paths)
token_lists = [_path_to_tokens(p) for p in paths]
lines = _format_path_cluster(token_lists, use_brackets=False)
return "\n".join(lines)
@dataclass
class MultitestSession:
"""Represents an active multitest listening session"""
user_id: str
target_packet_hash: str
triggering_timestamp: float
listening_start_time: float
listening_duration: float
collected_paths: set[str]
initial_path: Optional[str] = None
class MultitestCommand(BaseCommand):
"""Handles the multitest command - listens for multiple path variations"""
# Plugin metadata
name = "multitest"
keywords = ['multitest', 'mt']
description = "Listens for 6 seconds and collects all unique paths from incoming messages"
category = "meshcore_info"
# Documentation
short_description = "Listens for 6 seconds and collects all unique paths your incoming messages took to reach the bot"
usage = "multitest"
examples = ["multitest", "mt"]
def __init__(self, bot):
super().__init__(bot)
self.multitest_enabled = self.get_config_value('Multitest_Command', 'enabled', fallback=True, value_type='bool')
# Track active sessions per user to prevent race conditions
# Key: user_id, Value: MultitestSession
self._active_sessions: dict[str, MultitestSession] = {}
# Lock to prevent concurrent execution from interfering (lazily initialized)
self._execution_lock: Optional[asyncio.Lock] = None
self._load_config()
def _get_execution_lock(self) -> asyncio.Lock:
"""Get or create the execution lock (lazy initialization)"""
if self._execution_lock is None:
self._execution_lock = asyncio.Lock()
return self._execution_lock
def can_execute(self, message: MeshMessage, skip_channel_check: bool = False) -> bool:
"""Check if this command can be executed with the given message.
Args:
message: The message triggering the command.
Returns:
bool: True if command is enabled and checks pass, False otherwise.
"""
if not self.multitest_enabled:
return False
return super().can_execute(message)
def _load_config(self):
"""Load configuration for multitest command"""
response_format = self.get_config_value('Multitest_Command', 'response_format', fallback='')
if response_format and response_format.strip():
# Strip quotes if present (config parser may add them)
response_format = self._strip_quotes_from_config(response_format).strip()
# Decode escape sequences (e.g., \n -> newline)
try:
# Use encode/decode to convert escape sequences to actual characters
self.response_format = response_format.encode('latin-1').decode('unicode_escape')
except (UnicodeDecodeError, UnicodeEncodeError):
# If decoding fails, use as-is (fallback)
self.response_format = response_format
else:
self.response_format = None # Use default format
self.condense_paths = self.get_config_value(
'Multitest_Command', 'condense_paths', fallback=False, value_type='bool'
)
def get_help_text(self) -> str:
return self.translate('commands.multitest.help', fallback="Listens for 6 seconds and collects all unique paths from incoming messages")
def matches_keyword(self, message: MeshMessage) -> bool:
"""Check if message matches multitest keyword"""
content = message.content.strip()
# Handle exclamation prefix
if content.startswith('!'):
content = content[1:].strip()
content_lower = content.lower()
# Check for exact match or keyword followed by space
for keyword in self.keywords:
if content_lower == keyword or content_lower.startswith(keyword + ' '):
return True
# Check for variants: "mt long", "mt xlong", "multitest long", "multitest xlong"
if content_lower.startswith('mt ') or content_lower.startswith('multitest '):
parts = content_lower.split()
if len(parts) >= 2 and parts[0] in ['mt', 'multitest']:
variant = parts[1]
if variant in ['long', 'xlong']:
return True
return False
def extract_path_from_rf_data(self, rf_data: dict) -> Optional[str]:
"""Extract path in prefix string format from RF data routing_info.
Supports 1-, 2-, and 3-byte-per-hop (2, 4, or 6 hex chars per node).
"""
try:
routing_info = rf_data.get('routing_info')
if not routing_info:
return None
path_nodes = routing_info.get('path_nodes', [])
if not path_nodes:
# Fallback: build from path_hex using bytes_per_hop from packet
path_hex = routing_info.get('path_hex', '')
if path_hex:
bytes_per_hop = routing_info.get('bytes_per_hop')
n = (bytes_per_hop * 2) if bytes_per_hop and bytes_per_hop >= 1 else getattr(self.bot, 'prefix_hex_chars', 2)
if n <= 0:
n = 2
path_nodes = [path_hex[i:i + n] for i in range(0, len(path_hex), n)]
if (len(path_hex) % n) != 0:
path_nodes = [path_hex[i:i + 2] for i in range(0, len(path_hex), 2)]
if path_nodes:
# Validate: each node 2, 4, or 6 hex chars
valid_parts = []
for node in path_nodes:
node_str = str(node).lower().strip()
if len(node_str) in (2, 4, 6) and all(c in '0123456789abcdef' for c in node_str):
valid_parts.append(node_str)
if valid_parts:
return ','.join(valid_parts)
return None
except Exception as e:
self.logger.debug(f"Error extracting path from RF data: {e}")
return None
def extract_path_from_message(self, message: MeshMessage) -> Optional[str]:
"""Extract path in prefix string format from a message.
Prefers message.routing_info.path_nodes when present (multi-byte).
When routing_info has bytes_per_hop, uses it instead of inferring hop size.
Otherwise parses message.path: comma-separated tokens infer (2/4/6 hex);
continuous hex uses bot.prefix_hex_chars via parse_path_string().
"""
routing_info = getattr(message, 'routing_info', None)
bytes_per_hop = routing_info.get('bytes_per_hop') if routing_info else None
hex_chars_per_node = (bytes_per_hop * 2) if (bytes_per_hop and bytes_per_hop >= 1) else None
# Prefer routing_info when present (same as path command; no re-parse)
if routing_info is not None:
if routing_info.get('path_length', 0) == 0:
return None
path_nodes = routing_info.get('path_nodes', [])
if not path_nodes and hex_chars_per_node:
# Build from path_hex when path_nodes missing but bytes_per_hop known
path_hex = routing_info.get('path_hex', '')
if path_hex:
n = hex_chars_per_node
path_nodes = [path_hex[i:i + n] for i in range(0, len(path_hex), n)]
if (len(path_hex) % n) != 0:
path_nodes = [path_hex[i:i + 2] for i in range(0, len(path_hex), 2)]
if path_nodes:
valid = [str(n).lower().strip() for n in path_nodes
if len(str(n).strip()) in (2, 4, 6) and all(c in '0123456789abcdef' for c in str(n).lower().strip())]
if valid:
return ','.join(valid)
if not message.path:
return None
if "Direct" in message.path or "0 hops" in message.path:
return None
path_string = message.path
if " via ROUTE_TYPE_" in path_string:
path_string = path_string.split(" via ROUTE_TYPE_")[0]
path_string = re.sub(r'\s*\([^)]*hops?[^)]*\)', '', path_string, flags=re.IGNORECASE).strip()
# When bytes_per_hop is known, use it; otherwise infer from commas or use bot.prefix_hex_chars
expected_n = hex_chars_per_node or getattr(self.bot, 'prefix_hex_chars', 2)
if expected_n <= 0:
expected_n = 2
if ',' in path_string:
tokens = [t.strip() for t in path_string.split(',') if t.strip()]
if tokens:
if hex_chars_per_node:
# Use known hop size: all tokens must be that length
valid_hex = all(
len(t) == expected_n and all(c in '0123456789aAbBcCdDeEfF' for c in t)
for t in tokens
)
if valid_hex:
return ','.join(t.lower() for t in tokens)
else:
# Infer from token length (2, 4, or 6, all same)
lengths = {len(t) for t in tokens}
valid_hex = all(
len(t) in (2, 4, 6) and all(c in '0123456789aAbBcCdDeEfF' for c in t)
for t in tokens
)
if valid_hex and len(lengths) == 1 and next(iter(lengths)) in (2, 4, 6):
return ','.join(t.lower() for t in tokens)
# Continuous hex: use bytes_per_hop when known, else bot.prefix_hex_chars
node_ids = parse_path_string(path_string, prefix_hex_chars=expected_n)
if node_ids:
return ','.join(n.lower() for n in node_ids)
return None
def get_rf_data_for_message(self, message: MeshMessage) -> Optional[dict]:
"""Get RF data for a message by looking it up in recent RF data"""
try:
# Try multiple correlation strategies
# Strategy 1: Use sender_pubkey to find recent RF data
if message.sender_pubkey:
# Try full pubkey first
recent_rf_data = self.bot.message_handler.find_recent_rf_data(message.sender_pubkey)
if recent_rf_data:
return recent_rf_data
# Try pubkey prefix (first 16 chars)
if len(message.sender_pubkey) >= 16:
pubkey_prefix = message.sender_pubkey[:16]
recent_rf_data = self.bot.message_handler.find_recent_rf_data(pubkey_prefix)
if recent_rf_data:
return recent_rf_data
# Strategy 2: Look through recent RF data for matching pubkey
if message.sender_pubkey and self.bot.message_handler.recent_rf_data:
# Search recent RF data for matching pubkey
for rf_data in reversed(self.bot.message_handler.recent_rf_data):
rf_pubkey = rf_data.get('pubkey_prefix', '')
if rf_pubkey and message.sender_pubkey.startswith(rf_pubkey):
return rf_data
# Strategy 3: Use most recent RF data as fallback
# This is less reliable but might work if timing is very close
if self.bot.message_handler.recent_rf_data:
# Get the most recent RF data entry within a short time window
current_time = time.time()
recent_entries = [
rf for rf in self.bot.message_handler.recent_rf_data
if current_time - rf.get('timestamp', 0) < 5.0 # Within last 5 seconds
]
if recent_entries:
most_recent = max(recent_entries, key=lambda x: x.get('timestamp', 0))
return most_recent
return None
except Exception as e:
self.logger.debug(f"Error getting RF data for message: {e}")
return None
def on_message_received(self, message: MeshMessage):
"""Callback method called by message handler when a message is received during listening.
Checks all active sessions to see if this message matches any of them.
"""
if not self._active_sessions:
return
# Get RF data for this message (contains pre-calculated packet hash)
rf_data = self.get_rf_data_for_message(message)
if not rf_data:
# Can't get RF data, skip this message
self.logger.debug(f"Skipping message - no RF data found (sender: {message.sender_id})")
return
# Use pre-calculated packet hash if available, otherwise calculate it
message_hash = rf_data.get('packet_hash')
if not message_hash and rf_data.get('raw_hex'):
# Fallback: calculate hash if not stored (for older RF data)
try:
payload_type = None
routing_info = rf_data.get('routing_info', {})
if routing_info:
# Try to get payload type from routing_info if available
payload_type = routing_info.get('payload_type')
message_hash = calculate_packet_hash(rf_data['raw_hex'], payload_type)
except Exception as e:
self.logger.debug(f"Error calculating packet hash: {e}")
message_hash = None
if not message_hash:
# Can't determine hash, skip this message
self.logger.debug(f"Skipping message - could not determine packet hash (sender: {message.sender_id})")
return
# Check all active sessions to see if this message matches any of them
current_time = time.time()
for user_id, session in list(self._active_sessions.items()):
# Check if we're still in the listening window for this session
elapsed = current_time - session.listening_start_time
if elapsed >= session.listening_duration:
continue # Session expired, skip it
# CRITICAL: Only collect paths if this message has the same hash as the target
# This ensures we only track variations of the same original message
if message_hash == session.target_packet_hash:
# Try to extract path from RF data first (more reliable)
path = self.extract_path_from_rf_data(rf_data)
# Fallback to message path if RF data extraction failed
if not path:
path = self.extract_path_from_message(message)
if path:
session.collected_paths.add(path)
self.logger.info(f"✓ Collected path for user {user_id}: {path} (hash: {message_hash[:8]}...)")
else:
# Log when we have a matching hash but can't extract path
routing_info = rf_data.get('routing_info', {})
path_length = routing_info.get('path_length', 0)
if path_length == 0:
self.logger.debug(f"Matched hash {message_hash[:8]}... but path is direct (0 hops) for user {user_id}")
else:
self.logger.debug(f"Matched hash {message_hash[:8]}... but couldn't extract path from routing_info: {routing_info} for user {user_id}")
else:
# Log hash mismatches for debugging (but limit to avoid spam)
self.logger.debug(f"✗ Hash mismatch for user {user_id} - target: {session.target_packet_hash[:8]}..., received: {message_hash[:8]}... (sender: {message.sender_id})")
def _scan_recent_rf_data(self, session: MultitestSession):
"""Scan recent RF data for packets with matching hash (for messages that haven't been processed yet)
Args:
session: The multitest session to scan for
"""
if not session.target_packet_hash:
return
try:
current_time = time.time()
matching_count = 0
mismatching_count = 0
# Look at RF data from the last few seconds (before listening started, in case packets arrived just before)
for rf_data in self.bot.message_handler.recent_rf_data:
# Check if this RF data is recent enough
rf_timestamp = rf_data.get('timestamp', 0)
time_diff = current_time - rf_timestamp
# Only include RF data from the triggering message timestamp onwards
# This prevents collecting packets from earlier messages that happen to have the same hash
if rf_timestamp >= session.triggering_timestamp and time_diff <= session.listening_duration:
packet_hash = rf_data.get('packet_hash')
# CRITICAL: Only process if hash matches exactly and is not None/empty
if packet_hash and packet_hash == session.target_packet_hash:
matching_count += 1
# Extract path from this RF data
path = self.extract_path_from_rf_data(rf_data)
if path:
session.collected_paths.add(path)
self.logger.info(f"✓ Collected path from RF scan for user {session.user_id}: {path} (hash: {packet_hash[:8]}..., time: {time_diff:.2f}s)")
else:
self.logger.debug(f"Matched hash {packet_hash[:8]}... in RF scan but couldn't extract path for user {session.user_id}")
elif packet_hash:
mismatching_count += 1
# Only log first few mismatches to avoid spam
if mismatching_count <= 3:
self.logger.debug(f"✗ RF scan hash mismatch for user {session.user_id} - target: {session.target_packet_hash[:8]}..., found: {packet_hash[:8]}... (time: {time_diff:.2f}s)")
if matching_count > 0 or mismatching_count > 0:
self.logger.debug(f"RF scan complete for user {session.user_id}: {matching_count} matching, {mismatching_count} mismatching packets")
except Exception as e:
self.logger.debug(f"Error scanning recent RF data for user {session.user_id}: {e}")
async def execute(self, message: MeshMessage) -> bool:
"""Execute the multitest command"""
user_id = message.sender_id or "unknown"
# Use lock to prevent concurrent execution from interfering
async with self._get_execution_lock():
# Check if user already has an active session
if user_id in self._active_sessions:
existing_session = self._active_sessions[user_id]
elapsed = time.time() - existing_session.listening_start_time
if elapsed < existing_session.listening_duration:
# User already has an active session - silently ignore second Mt
# so the first session can complete and send its response
return True
# Record execution time BEFORE starting async work to prevent race conditions
self.record_execution(user_id)
# Determine listening duration based on command variant
content = message.content.strip()
if content.startswith('!'):
content = content[1:].strip()
content_lower = content.lower()
listening_duration = 6.0 # Default
# Check for variants: "mt long", "mt xlong", "multitest long", "multitest xlong"
if content_lower.startswith('mt ') or content_lower.startswith('multitest '):
parts = content_lower.split()
if len(parts) >= 2 and parts[0] in ['mt', 'multitest']:
variant = parts[1]
if variant == 'long':
listening_duration = 10.0
self.logger.info(f"Multitest command (long) executed by {user_id} - starting 10 second listening window")
elif variant == 'xlong':
listening_duration = 14.0
self.logger.info(f"Multitest command (xlong) executed by {user_id} - starting 14 second listening window")
else:
self.logger.info(f"Multitest command executed by {user_id} - starting 6 second listening window")
else:
self.logger.info(f"Multitest command executed by {user_id} - starting 6 second listening window")
else:
self.logger.info(f"Multitest command executed by {user_id} - starting 6 second listening window")
# Get RF data for the triggering message (contains pre-calculated packet hash)
rf_data = self.get_rf_data_for_message(message)
if not rf_data:
response = "Error: Could not find packet data for this message. Please try again."
await self.send_response(message, response)
return True
# Use pre-calculated packet hash if available, otherwise calculate it
packet_hash = rf_data.get('packet_hash')
if not packet_hash and rf_data.get('raw_hex'):
# Fallback: calculate hash if not stored (for older RF data)
# IMPORTANT: Must use same payload_type that was used during ingestion
payload_type = None
routing_info = rf_data.get('routing_info', {})
if routing_info:
payload_type = routing_info.get('payload_type')
packet_hash = calculate_packet_hash(rf_data['raw_hex'], payload_type)
if not packet_hash:
response = "Error: Could not calculate packet hash for this message. Please try again."
await self.send_response(message, response)
return True
# Store the timestamp of the triggering message to avoid collecting older packets
triggering_rf_timestamp = rf_data.get('timestamp', time.time())
# Also extract path from the triggering message itself
initial_path = self.extract_path_from_message(message)
# Also try to extract from RF data (more reliable)
if not initial_path and rf_data:
initial_path = self.extract_path_from_rf_data(rf_data)
if initial_path:
self.logger.debug(f"Initial path from triggering message for user {user_id}: {initial_path}")
# Create a new session for this user
session = MultitestSession(
user_id=user_id,
target_packet_hash=packet_hash,
triggering_timestamp=triggering_rf_timestamp,
listening_start_time=time.time(),
listening_duration=listening_duration,
collected_paths=set(),
initial_path=initial_path
)
# Add initial path if available
if initial_path:
session.collected_paths.add(initial_path)
# Register this session
self._active_sessions[user_id] = session
# Register this command instance as the active listener (if not already registered)
# Store reference in message handler so it can call on_message_received
if self.bot.message_handler.multitest_listener is None:
self.bot.message_handler.multitest_listener = self
self.logger.info(f"Tracking packet hash for user {user_id}: {packet_hash[:16]}... (full: {packet_hash})")
self.logger.debug(f"Triggering message timestamp for user {user_id}: {triggering_rf_timestamp}")
# Release lock before async sleep to allow other users to start their sessions
# Also scan recent RF data for matching hashes (in case messages haven't been processed yet)
# But only include packets that arrived at or after the triggering message
self._scan_recent_rf_data(session)
try:
# Wait for the listening duration
await asyncio.sleep(session.listening_duration)
finally:
# Re-acquire lock to clean up session
async with self._get_execution_lock():
# Remove this session
if user_id in self._active_sessions:
del self._active_sessions[user_id]
# Unregister listener if no more active sessions
if not self._active_sessions and self.bot.message_handler.multitest_listener == self:
self.bot.message_handler.multitest_listener = None
# Do a final scan of RF data in case any matching packets arrived
self._scan_recent_rf_data(session)
# Store hash for error message before clearing it
tracking_hash = session.target_packet_hash
# Format the collected paths
if session.collected_paths:
# Sort paths for consistent output
sorted_paths = sorted(session.collected_paths)
if self.condense_paths and len(sorted_paths) > 1:
paths_text = _condense_path_lines(sorted_paths)
else:
paths_text = "\n".join(sorted_paths)
path_count = len(sorted_paths)
# Use configured format if available, otherwise use default
if self.response_format:
try:
response = self.response_format.format(
sender=message.sender_id or "Unknown",
path_count=path_count,
paths=paths_text,
listening_duration=int(session.listening_duration)
)
except (KeyError, ValueError) as e:
# If formatting fails, fall back to default
self.logger.debug(f"Error formatting multitest response: {e}, using default format")
response = f"Found {path_count} unique path(s):\n{paths_text}"
else:
# Default format
response = f"Found {path_count} unique path(s):\n{paths_text}"
else:
# Provide more helpful error message with diagnostic info
matching_packets = 0
if self.bot.message_handler.recent_rf_data and tracking_hash:
for rf_data in self.bot.message_handler.recent_rf_data:
if rf_data.get('packet_hash') == tracking_hash:
matching_packets += 1
if tracking_hash is None:
response = ("Error: Could not determine packet hash for tracking. "
"The triggering message may not have valid packet data.")
elif matching_packets > 0:
response = (f"No paths extracted from {matching_packets} matching packet(s) "
f"(hash: {tracking_hash}). "
f"Packets may be direct (0 hops) or path extraction failed.")
else:
response = (f"No matching packets found during {session.listening_duration}s window. "
f"Tracking hash: {tracking_hash}. ")
# Wait for bot TX rate limiter cooldown to expire before sending
# This ensures we respond even if another command put the bot on cooldown
await self.bot.bot_tx_rate_limiter.wait_for_tx()
# Also wait for user rate limiter if needed
if not self.bot.rate_limiter.can_send():
wait_time = self.bot.rate_limiter.time_until_next()
if wait_time > 0:
self.logger.info(f"Waiting {wait_time:.1f} seconds for rate limiter")
await asyncio.sleep(wait_time + 0.1) # Small buffer
# Send the response
await self.send_response(message, response)
return True