Files
meshcore-bot/modules/commands/path_command.py
agessaman 21676f0792 feat(worldcup): add support for announcing disallowed goals and enhance goal tracking
- Introduced a new configuration option `announce_disallowed` to allow notifications for goals overturned by VAR.
- Updated `WorldCupLiveService` to handle disallowed goals, including formatting for previously announced scorers.
- Enhanced `ESPNClient` to ensure accurate goal tracking and state management.
- Added unit tests to validate the new disallowed goal functionality and ensure correct behavior during matches.
2026-06-17 22:11:24 -07:00

936 lines
50 KiB
Python

#!/usr/bin/env python3
"""
Path Decode Command for the MeshCore Bot
Decodes hex path data to show which repeaters were involved in message routing
"""
import asyncio
import re
import time
from typing import Any, Callable, Optional
from ..models import MeshMessage
from ..path_inference import (
PathInferenceConfig,
select_node_repeater,
select_repeater_by_graph,
)
from ..utils import (
bytes_per_hop_from_routing_and_nodes,
parse_path_string,
public_key_has_prefix,
)
from .base_command import BaseCommand
class PathCommand(BaseCommand):
"""Command for decoding path data to repeater names"""
# Plugin metadata
name = "path"
keywords = ["path", "decode", "route"]
description = "Decode hex path data to show which repeaters were involved in message routing"
requires_dm = False
cooldown_seconds = 1
category = "meshcore_info"
# Documentation
short_description = "Decode path data to show repeaters involved in message routing"
usage = "path [hex_data]"
examples = ["path", "decode"]
def __init__(self, bot):
super().__init__(bot)
self.path_enabled = self.get_config_value('Path_Command', 'enabled', fallback=True, value_type='bool')
# Explicit config toggle; set False to disable even when bot lat/lon are configured
self.geographic_scoring_config_enabled = bot.config.getboolean(
'Path_Command', 'geographic_scoring_enabled', fallback=True
)
# Get bot location from config for geographic proximity calculations
# Check if geographic guessing is enabled (bot has location configured)
self.geographic_guessing_enabled = False
self.bot_latitude = None
self.bot_longitude = None
# Get proximity calculation method from config
self.proximity_method = bot.config.get('Path_Command', 'proximity_method', fallback='simple')
self.path_proximity_fallback = bot.config.getboolean('Path_Command', 'path_proximity_fallback', fallback=True)
self.max_proximity_range = bot.config.getfloat('Path_Command', 'max_proximity_range', fallback=200.0)
self.max_repeater_age_days = bot.config.getint('Path_Command', 'max_repeater_age_days', fallback=14)
# Get recency/proximity weighting (0.0 to 1.0, where 1.0 = 100% recency, 0.0 = 100% proximity)
# Default 0.4 means 40% recency, 60% proximity (more balanced for path routing)
recency_weight = bot.config.getfloat('Path_Command', 'recency_weight', fallback=0.4)
self.recency_weight = max(0.0, min(1.0, recency_weight)) # Clamp to 0.0-1.0
self.proximity_weight = 1.0 - self.recency_weight
# Get recency decay half-life for longer advert intervals (default: 12 hours, suggested: 36-48 for 48-72 hour intervals)
self.recency_decay_half_life_hours = bot.config.getfloat('Path_Command', 'recency_decay_half_life_hours', fallback=12.0)
# Check for preset first, then apply individual settings (preset can be overridden)
preset = bot.config.get('Path_Command', 'path_selection_preset', fallback='balanced').lower()
# Apply preset defaults, then individual settings override
if preset == 'geographic':
# Prioritize geographic proximity
preset_graph_confidence_threshold = 0.5
preset_distance_threshold = 30.0
preset_distance_penalty = 0.5
preset_final_hop_weight = 0.4
elif preset == 'graph':
# Prioritize graph evidence
preset_graph_confidence_threshold = 0.9
preset_distance_threshold = 50.0
preset_distance_penalty = 0.2
preset_final_hop_weight = 0.15
else: # 'balanced' (default)
# Balanced approach
preset_graph_confidence_threshold = 0.7
preset_distance_threshold = 30.0
preset_distance_penalty = 0.3
preset_final_hop_weight = 0.25
# Graph-based validation settings
self.graph_based_validation = bot.config.getboolean('Path_Command', 'graph_based_validation', fallback=True)
self.min_edge_observations = bot.config.getint('Path_Command', 'min_edge_observations', fallback=3)
# Enhanced graph features
self.graph_use_bidirectional = bot.config.getboolean('Path_Command', 'graph_use_bidirectional', fallback=True)
self.graph_use_hop_position = bot.config.getboolean('Path_Command', 'graph_use_hop_position', fallback=True)
self.graph_multi_hop_enabled = bot.config.getboolean('Path_Command', 'graph_multi_hop_enabled', fallback=True)
self.graph_multi_hop_max_hops = bot.config.getint('Path_Command', 'graph_multi_hop_max_hops', fallback=2)
self.graph_geographic_combined = bot.config.getboolean('Path_Command', 'graph_geographic_combined', fallback=False)
self.graph_geographic_weight = bot.config.getfloat('Path_Command', 'graph_geographic_weight', fallback=0.7)
self.graph_geographic_weight = max(0.0, min(1.0, self.graph_geographic_weight)) # Clamp to 0.0-1.0
# Apply preset for confidence threshold, but allow override
self.graph_confidence_override_threshold = bot.config.getfloat('Path_Command', 'graph_confidence_override_threshold', fallback=preset_graph_confidence_threshold)
self.graph_confidence_override_threshold = max(0.0, min(1.0, self.graph_confidence_override_threshold)) # Clamp to 0.0-1.0
self.graph_distance_penalty_enabled = bot.config.getboolean('Path_Command', 'graph_distance_penalty_enabled', fallback=True)
self.graph_max_reasonable_hop_distance_km = bot.config.getfloat('Path_Command', 'graph_max_reasonable_hop_distance_km', fallback=preset_distance_threshold)
self.graph_distance_penalty_strength = bot.config.getfloat('Path_Command', 'graph_distance_penalty_strength', fallback=preset_distance_penalty)
self.graph_distance_penalty_strength = max(0.0, min(1.0, self.graph_distance_penalty_strength)) # Clamp to 0.0-1.0
self.graph_zero_hop_bonus = bot.config.getfloat('Path_Command', 'graph_zero_hop_bonus', fallback=0.4)
self.graph_zero_hop_bonus = max(0.0, min(1.0, self.graph_zero_hop_bonus)) # Clamp to 0.0-1.0
self.graph_prefer_stored_keys = bot.config.getboolean('Path_Command', 'graph_prefer_stored_keys', fallback=True)
# Final hop proximity settings for graph selection
# Defaults based on LoRa ranges: typical < 30km, long up to 200km, very close < 10km
self.graph_final_hop_proximity_enabled = bot.config.getboolean('Path_Command', 'graph_final_hop_proximity_enabled', fallback=True)
self.graph_final_hop_proximity_weight = bot.config.getfloat('Path_Command', 'graph_final_hop_proximity_weight', fallback=preset_final_hop_weight)
self.graph_final_hop_proximity_weight = max(0.0, min(1.0, self.graph_final_hop_proximity_weight)) # Clamp to 0.0-1.0
self.graph_final_hop_max_distance = bot.config.getfloat('Path_Command', 'graph_final_hop_max_distance', fallback=0.0)
self.graph_final_hop_proximity_normalization_km = bot.config.getfloat('Path_Command', 'graph_final_hop_proximity_normalization_km', fallback=200.0) # Long LoRa range
self.graph_final_hop_very_close_threshold_km = bot.config.getfloat('Path_Command', 'graph_final_hop_very_close_threshold_km', fallback=10.0)
self.graph_final_hop_close_threshold_km = bot.config.getfloat('Path_Command', 'graph_final_hop_close_threshold_km', fallback=30.0) # Typical LoRa range
self.graph_final_hop_max_proximity_weight = bot.config.getfloat('Path_Command', 'graph_final_hop_max_proximity_weight', fallback=0.6)
self.graph_final_hop_max_proximity_weight = max(0.0, min(1.0, self.graph_final_hop_max_proximity_weight)) # Clamp to 0.0-1.0
self.graph_path_validation_max_bonus = bot.config.getfloat('Path_Command', 'graph_path_validation_max_bonus', fallback=0.3)
self.graph_path_validation_max_bonus = max(0.0, min(1.0, self.graph_path_validation_max_bonus)) # Clamp to 0.0-1.0
self.graph_path_validation_obs_divisor = bot.config.getfloat('Path_Command', 'graph_path_validation_obs_divisor', fallback=50.0)
# Get star bias multiplier (how much to boost starred repeaters' scores)
# Default 2.5 means starred repeaters get 2.5x their normal score
self.star_bias_multiplier = bot.config.getfloat('Path_Command', 'star_bias_multiplier', fallback=2.5)
self.star_bias_multiplier = max(1.0, self.star_bias_multiplier) # Ensure at least 1.0
# Get confidence indicator symbols from config
self.high_confidence_symbol = bot.config.get('Path_Command', 'high_confidence_symbol', fallback='🎯')
self.medium_confidence_symbol = bot.config.get('Path_Command', 'medium_confidence_symbol', fallback='📍')
self.low_confidence_symbol = bot.config.get('Path_Command', 'low_confidence_symbol', fallback='')
# Check if "p" shortcut is enabled (on by default)
self.enable_p_shortcut = bot.config.getboolean('Path_Command', 'enable_p_shortcut', fallback=True)
if self.enable_p_shortcut:
# Add "p" to keywords if enabled
if "p" not in self.keywords:
self.keywords.append("p")
reply_prefix_raw = bot.config.get('Path_Command', 'reply_prefix', fallback='')
self.path_reply_prefix = self._strip_quotes_from_config(reply_prefix_raw).strip()
minimum_path_bytes_raw = bot.config.getint('Path_Command', 'minimum_path_bytes', fallback=0)
if minimum_path_bytes_raw not in (0, 1, 2, 3):
self.logger.warning(
f"Invalid Path_Command.minimum_path_bytes={minimum_path_bytes_raw}; defaulting to 0"
)
self.minimum_path_bytes = 0
else:
self.minimum_path_bytes = minimum_path_bytes_raw
try:
# Try to get location from Bot section
if bot.config.has_section('Bot'):
lat = bot.config.getfloat('Bot', 'bot_latitude', fallback=None)
lon = bot.config.getfloat('Bot', 'bot_longitude', fallback=None)
if lat is not None and lon is not None:
# Validate coordinates
if -90 <= lat <= 90 and -180 <= lon <= 180:
self.bot_latitude = lat
self.bot_longitude = lon
if self.geographic_scoring_config_enabled:
self.geographic_guessing_enabled = True
self.logger.info(f"Geographic proximity guessing enabled with bot location: {lat:.4f}, {lon:.4f}")
else:
self.logger.info("Geographic proximity guessing disabled via config (geographic_scoring_enabled = false)")
self.logger.info(f"Proximity method: {self.proximity_method}")
self.logger.info(f"Max repeater age: {self.max_repeater_age_days} days")
else:
self.logger.warning(f"Invalid bot coordinates in config: {lat}, {lon}")
else:
self.logger.info("Bot location not configured - geographic proximity guessing disabled")
else:
self.logger.info("Bot section not found - geographic proximity guessing disabled")
except Exception as e:
self.logger.warning(f"Error reading bot location from config: {e} - geographic proximity guessing disabled")
def _inference_config(self) -> PathInferenceConfig:
"""Build a shared-engine config snapshot from this command's current attributes.
Read fresh on each call so tests that mutate ``self.*`` after construction are honored.
``bot_command=True`` selects the bot `path` command's selection semantics in the engine.
"""
return PathInferenceConfig(
geographic_guessing_enabled=self.geographic_guessing_enabled,
bot_latitude=self.bot_latitude,
bot_longitude=self.bot_longitude,
geographic_scoring_enabled=self.geographic_scoring_config_enabled,
proximity_method=self.proximity_method,
path_proximity_fallback=self.path_proximity_fallback,
max_proximity_range=self.max_proximity_range,
max_repeater_age_days=self.max_repeater_age_days,
recency_weight=self.recency_weight,
proximity_weight=self.proximity_weight,
recency_decay_half_life_hours=self.recency_decay_half_life_hours,
graph_based_validation=self.graph_based_validation,
min_edge_observations=self.min_edge_observations,
graph_use_bidirectional=self.graph_use_bidirectional,
graph_use_hop_position=self.graph_use_hop_position,
graph_multi_hop_enabled=self.graph_multi_hop_enabled,
graph_multi_hop_max_hops=self.graph_multi_hop_max_hops,
graph_geographic_combined=self.graph_geographic_combined,
graph_geographic_weight=self.graph_geographic_weight,
graph_confidence_override_threshold=self.graph_confidence_override_threshold,
graph_distance_penalty_enabled=self.graph_distance_penalty_enabled,
graph_max_reasonable_hop_distance_km=self.graph_max_reasonable_hop_distance_km,
graph_distance_penalty_strength=self.graph_distance_penalty_strength,
graph_zero_hop_bonus=self.graph_zero_hop_bonus,
graph_prefer_stored_keys=self.graph_prefer_stored_keys,
graph_final_hop_proximity_enabled=self.graph_final_hop_proximity_enabled,
graph_final_hop_proximity_weight=self.graph_final_hop_proximity_weight,
graph_final_hop_max_distance=self.graph_final_hop_max_distance,
graph_final_hop_proximity_normalization_km=self.graph_final_hop_proximity_normalization_km,
graph_final_hop_very_close_threshold_km=self.graph_final_hop_very_close_threshold_km,
graph_final_hop_close_threshold_km=self.graph_final_hop_close_threshold_km,
graph_final_hop_max_proximity_weight=self.graph_final_hop_max_proximity_weight,
graph_path_validation_max_bonus=self.graph_path_validation_max_bonus,
graph_path_validation_obs_divisor=self.graph_path_validation_obs_divisor,
star_bias_multiplier=self.star_bias_multiplier,
bot_command=True,
)
def _bytes_per_hop_from_nodes_and_routing(
self, node_ids: list[str], routing_info: Optional[dict[str, Any]]
) -> int:
"""Bytes per hop from packet metadata or inferred from hex node width."""
return bytes_per_hop_from_routing_and_nodes(routing_info, node_ids)
def _should_resolve_repeater_names(
self, node_ids: list[str], routing_info: Optional[dict[str, Any]]
) -> bool:
if self.minimum_path_bytes in (0, 1):
return True
bph = self._bytes_per_hop_from_nodes_and_routing(node_ids, routing_info)
return bph >= self.minimum_path_bytes
def _format_path_reply_prefix(self, message: MeshMessage) -> str:
if not self.path_reply_prefix:
return ''
formatted = self.format_response(message, self.path_reply_prefix).rstrip()
if not formatted:
return ''
return formatted + '\n'
def _format_repeater_resolution_deferred(self, node_ids: list[str]) -> str:
path_display = ','.join(node_ids)
return self.translate(
'commands.path.repeater_resolution_deferred',
path=path_display,
minimum_path_bytes=self.minimum_path_bytes,
)
async def _decode_node_ids(
self, node_ids: list[str], routing_info: Optional[dict[str, Any]] = None
) -> str:
self.logger.info(f"Decoding path with {len(node_ids)} nodes: {','.join(node_ids)}")
if not self._should_resolve_repeater_names(node_ids, routing_info):
return self._format_repeater_resolution_deferred(node_ids)
repeater_info = await self._lookup_repeater_names(node_ids)
return self._format_path_response(node_ids, repeater_info)
def can_execute(self, message: MeshMessage, skip_channel_check: bool = False) -> bool:
"""Check if this command can be executed with the given message.
Args:
message: The message triggering the command.
Returns:
bool: True if command is enabled and checks pass, False otherwise.
"""
if not self.path_enabled:
return False
return super().can_execute(message)
def matches_keyword(self, message: MeshMessage) -> bool:
"""Check if message starts with 'path' keyword or 'p' shortcut (if enabled)"""
content_lower = self.cleanup_message_for_matching(message)
# Handle "p" shortcut if enabled
if self.enable_p_shortcut:
if content_lower == "p":
return True # Just "p" by itself
elif content_lower.startswith('p ') and len(content_lower) > 2:
return True # "p " followed by path data
# Check if message starts with any of our keywords
return any(content_lower == keyword or content_lower.startswith(keyword + ' ') for keyword in self.keywords)
async def execute(self, message: MeshMessage) -> bool:
"""Execute path decode command"""
self.logger.info(f"Path command executed with content: {message.content}")
if not await self.enforce_path_byte_requirement(message, 'Path_Command'):
return True
# Store the current message for use in _extract_path_from_recent_messages
self._current_message = message
# Parse the message content to extract path data
content = message.content.strip()
parts = content.split()
if len(parts) < 2:
# No arguments provided - try to extract path from current message
response = await self._extract_path_from_recent_messages()
else:
# Extract path data from the command
path_input = " ".join(parts[1:])
response = await self._decode_path(path_input)
# Send the response (may be split into multiple messages if long)
await self._send_path_response(message, response)
return True
async def _decode_path(
self, path_input: str, routing_info: Optional[dict[str, Any]] = None
) -> str:
"""Decode hex path data to repeater names.
Comma-separated tokens infer hop size (2, 4, or 6 hex chars per node).
Otherwise uses bot.prefix_hex_chars via parse_path_string().
"""
try:
# Strip hop-count suffix if present (e.g. "01,5f (2 hops)")
path_input = re.sub(r'\s*\([^)]*hops?[^)]*\)', '', path_input, flags=re.IGNORECASE)
path_input = path_input.strip()
node_ids = None
# Comma-separated: infer hex chars per node from token length (2, 4, or 6)
if ',' in path_input:
tokens = [t.strip() for t in path_input.split(',') if t.strip()]
if tokens:
lengths = {len(t) for t in tokens}
valid_hex = all(
len(t) in (2, 4, 6) and all(c in '0123456789aAbBcCdDeEfF' for c in t)
for t in tokens
)
if valid_hex and len(lengths) == 1 and next(iter(lengths)) in (2, 4, 6):
node_ids = [t.upper() for t in tokens]
if node_ids is None:
prefix_hex_chars = getattr(self.bot, 'prefix_hex_chars', 2)
node_ids = parse_path_string(path_input, prefix_hex_chars=prefix_hex_chars)
if not node_ids:
return self.translate('commands.path.no_valid_hex')
return await self._decode_node_ids(node_ids, routing_info)
except Exception as e:
self.logger.error(f"Error decoding path: {e}")
return self.translate('commands.path.error_decoding', error=str(e))
async def _lookup_repeater_names(
self,
node_ids: list[str],
lookup_func: Optional[Callable[[str], list[dict[str, Any]]]] = None,
) -> dict[str, dict[str, Any]]:
"""Look up repeater names for given node IDs.
Args:
node_ids: List of node prefixes to look up.
lookup_func: Optional test hook. When provided, used instead of
repeater_manager/db_manager. Callable(node_id) -> list of repeater dicts.
"""
repeater_info = {}
try:
# Skip API cache for path decoding - use database with improved proximity logic
# API cache doesn't have recency-based proximity selection needed for path decoding
# Sender location (for path-proximity first-hop selection) is constant across nodes;
# compute it once, only when geographic guessing is active.
sender_location = self._get_sender_location() if self.geographic_guessing_enabled else None
# Query the database for repeaters with matching prefixes
# Node IDs are the configured prefix of the public key (see Bot.prefix_bytes)
for node_id in node_ids:
# Test dependency injection: use provided lookup when available
if lookup_func is not None:
results = lookup_func(node_id)
# Normalize to expected format (create_test_repeater already matches)
if results:
results = [
{
'name': r['name'],
'public_key': r['public_key'],
'device_type': r.get('device_type', 'repeater'),
'last_seen': r.get('last_seen', r.get('last_heard')),
'last_heard': r.get('last_heard', r.get('last_seen')),
'last_advert_timestamp': r.get('last_advert_timestamp'),
'is_active': r.get('is_active', True),
'latitude': r.get('latitude'),
'longitude': r.get('longitude'),
'city': r.get('city'),
'state': r.get('state'),
'country': r.get('country'),
'advert_count': r.get('advert_count', 1),
'signal_strength': r.get('signal_strength'),
'snr': r.get('snr'),
'hop_count': r.get('hop_count'),
'role': r.get('role', 'repeater'),
'is_starred': bool(r.get('is_starred', False)),
}
for r in results
]
else:
# First try complete tracking database (all heard contacts, filtered by role)
results = []
if hasattr(self.bot, 'repeater_manager'):
try:
# Get repeater devices from complete database (repeaters and roomservers)
complete_db = await self.bot.repeater_manager.get_repeater_devices(include_historical=True)
for row in complete_db:
if public_key_has_prefix(row['public_key'], node_id):
results.append({
'name': row['name'],
'public_key': row['public_key'],
'device_type': row['device_type'],
'last_seen': row['last_heard'],
'last_heard': row['last_heard'], # Include last_heard for recency calculation
'last_advert_timestamp': row.get('last_advert_timestamp'), # Include last_advert_timestamp for recency calculation
'is_active': row['is_currently_tracked'],
'latitude': row['latitude'],
'longitude': row['longitude'],
'city': row['city'],
'state': row['state'],
'country': row['country'],
'advert_count': row['advert_count'],
'signal_strength': row['signal_strength'],
'snr': row.get('snr'), # Include SNR for zero-hop bonus
'hop_count': row['hop_count'],
'role': row['role'],
'is_starred': bool(row.get('is_starred', 0)) # Include star status for bias
})
except Exception as e:
self.logger.debug(f"Error getting complete database: {e}")
results = []
# If complete tracking database failed, try direct query to complete_contact_tracking
if not results:
try:
# Build query with age filtering if configured
# Use last_advert_timestamp if available, otherwise fall back to last_heard
if self.max_repeater_age_days > 0:
query = f'''
SELECT name, public_key, device_type, last_heard, last_heard as last_seen,
last_advert_timestamp, latitude, longitude, city, state, country,
advert_count, signal_strength, snr, hop_count, role, is_starred
FROM complete_contact_tracking
WHERE public_key LIKE ? AND role IN ('repeater', 'roomserver')
AND (
(last_advert_timestamp IS NOT NULL AND last_advert_timestamp >= datetime('now', '-{self.max_repeater_age_days} days'))
OR (last_advert_timestamp IS NULL AND last_heard >= datetime('now', '-{self.max_repeater_age_days} days'))
)
ORDER BY COALESCE(last_advert_timestamp, last_heard) DESC
'''
else:
query = '''
SELECT name, public_key, device_type, last_heard, last_heard as last_seen,
last_advert_timestamp, latitude, longitude, city, state, country,
advert_count, signal_strength, snr, hop_count, role, is_starred
FROM complete_contact_tracking
WHERE public_key LIKE ? AND role IN ('repeater', 'roomserver')
ORDER BY COALESCE(last_advert_timestamp, last_heard) DESC
'''
prefix_pattern = f"{node_id}%"
results = self.bot.db_manager.execute_query(query, (prefix_pattern,))
# Convert results to expected format
if results:
results = [
{
'name': row['name'],
'public_key': row['public_key'],
'device_type': row['device_type'],
'last_seen': row['last_seen'],
'last_heard': row.get('last_heard', row['last_seen']),
'last_advert_timestamp': row.get('last_advert_timestamp'),
'is_active': True,
'latitude': row['latitude'],
'longitude': row['longitude'],
'city': row['city'],
'state': row['state'],
'country': row['country'],
'advert_count': row.get('advert_count', 0),
'signal_strength': row.get('signal_strength'),
'snr': row.get('snr'),
'hop_count': row.get('hop_count'),
'role': row.get('role'),
'is_starred': bool(row.get('is_starred', 0))
} for row in results
]
except Exception as e:
self.logger.debug(f"Error querying complete_contact_tracking directly: {e}")
results = []
if results:
# Build repeaters_data with the fields the selection engine needs. hop_count is
# intentionally omitted (this preserves prior behavior: the bot path's graph
# selection never applied the zero-hop bonus through this code path).
repeaters_data = [
{
'name': row['name'],
'public_key': row['public_key'],
'device_type': row['device_type'],
'last_seen': row['last_seen'],
'last_heard': row.get('last_heard', row['last_seen']), # Include last_heard for recency calculation
'last_advert_timestamp': row.get('last_advert_timestamp'), # Include last_advert_timestamp for recency calculation
'is_active': row['is_active'],
'latitude': row['latitude'],
'longitude': row['longitude'],
'city': row['city'],
'state': row['state'],
'country': row['country'],
'snr': row.get('snr'), # Include SNR for zero-hop bonus
'is_starred': row.get('is_starred', False) # Include star status for bias
} for row in results
]
# Delegate recency filtering, graph-based disambiguation, and geographic
# proximity to the shared engine (modules.path_inference). Candidate gathering
# (above), output shaping, and the device-contacts fallback (below) stay here.
selection = select_node_repeater(
node_id, repeaters_data, node_ids, self._inference_config(),
mesh_graph=getattr(self.bot, 'mesh_graph', None),
db_manager=self.bot.db_manager,
logger=self.logger,
graph_n=getattr(self.bot, 'prefix_hex_chars', 2),
sender_location=sender_location,
)
if selection.status == 'resolved':
# High confidence selection (graph or geographic)
selected_repeater = selection.repeater
repeater_info[node_id] = {
'name': selected_repeater['name'],
'public_key': selected_repeater['public_key'],
'device_type': selected_repeater['device_type'],
'last_seen': selected_repeater['last_seen'],
'is_active': selected_repeater['is_active'],
'found': True,
'collision': False,
'geographic_guess': (selection.method == 'geographic'),
'graph_guess': (selection.method == 'graph'),
'confidence': selection.confidence
}
elif selection.status == 'collision':
# Low confidence or no selection method - show collision warning
repeater_info[node_id] = {
'found': True,
'collision': True,
'matches': selection.matches,
'node_id': node_id,
'repeaters': selection.recent_repeaters
}
elif selection.status == 'single':
# Single recent match after filtering - no choice made, so no confidence indicator
repeater = selection.repeater
repeater_info[node_id] = {
'name': repeater['name'],
'public_key': repeater['public_key'],
'device_type': repeater['device_type'],
'last_seen': repeater['last_seen'],
'is_active': repeater['is_active'],
'found': True,
'collision': False
}
else:
# All repeaters filtered out (too old) - show as not found
repeater_info[node_id] = {
'found': False,
'node_id': node_id
}
else:
# Also check device contacts for active repeaters
device_matches = []
if hasattr(self.bot.meshcore, 'contacts'):
for contact_key, contact_data in self.bot.meshcore.contacts.items():
public_key = contact_data.get('public_key', contact_key)
if public_key_has_prefix(public_key, node_id):
# Check if this is a repeater
if hasattr(self.bot, 'repeater_manager') and self.bot.repeater_manager._is_repeater_device(contact_data):
name = contact_data.get('adv_name', contact_data.get('name', self.translate('commands.path.unknown_name')))
device_matches.append({
'name': name,
'public_key': public_key,
'device_type': contact_data.get('type', 'Unknown'),
'last_seen': 'Active',
'is_active': True,
'source': 'device'
})
if device_matches:
if len(device_matches) > 1:
# Multiple device matches - show collision warning
repeater_info[node_id] = {
'found': True,
'collision': True,
'matches': len(device_matches),
'node_id': node_id,
'repeaters': device_matches
}
else:
# Single device match
match = device_matches[0]
repeater_info[node_id] = {
'name': match['name'],
'public_key': match['public_key'],
'device_type': match['device_type'],
'last_seen': match['last_seen'],
'is_active': match['is_active'],
'found': True,
'collision': False,
'source': 'device'
}
else:
repeater_info[node_id] = {
'found': False,
'node_id': node_id
}
except Exception as e:
self.logger.error(f"Error looking up repeater names: {e}")
# Return basic info for all nodes
for node_id in node_ids:
repeater_info[node_id] = {
'found': False,
'node_id': node_id,
'error': str(e)
}
return repeater_info
async def _get_api_cache_data(self) -> Optional[dict[str, dict[str, Any]]]:
"""Get API cache data from the prefix command if available"""
try:
# Try to get the prefix command instance and its cache data
if hasattr(self.bot, 'command_manager'):
prefix_cmd = self.bot.command_manager.commands.get('prefix')
if prefix_cmd and hasattr(prefix_cmd, 'cache_data'):
# Check if cache is valid
current_time = time.time()
if current_time - prefix_cmd.cache_timestamp > prefix_cmd.cache_duration:
await prefix_cmd.refresh_cache()
return prefix_cmd.cache_data
except Exception as e:
self.logger.warning(f"Could not get API cache data: {e}")
return None
def _get_sender_location(self) -> Optional[tuple[float, float]]:
"""Get sender location from current message if available"""
try:
if not hasattr(self, '_current_message') or not self._current_message:
return None
sender_pubkey = self._current_message.sender_pubkey
if not sender_pubkey:
return None
# Look up sender location from database (any role, not just repeaters)
query = '''
SELECT latitude, longitude
FROM complete_contact_tracking
WHERE public_key = ?
AND latitude IS NOT NULL AND longitude IS NOT NULL
AND latitude != 0 AND longitude != 0
ORDER BY COALESCE(last_advert_timestamp, last_heard) DESC
LIMIT 1
'''
results = self.bot.db_manager.execute_query(query, (sender_pubkey,))
if results:
row = results[0]
return (row['latitude'], row['longitude'])
return None
except Exception as e:
self.logger.debug(f"Error getting sender location: {e}")
return None
def _filter_recent_repeaters(self, repeaters: list[dict[str, Any]], cutoff_hours: int = 24) -> list[dict[str, Any]]:
"""Filter repeaters to only include those that have advertised recently"""
from datetime import datetime, timedelta
recent_repeaters = []
cutoff_time = datetime.now() - timedelta(hours=cutoff_hours)
for repeater in repeaters:
# Check recency using multiple timestamp fields
is_recent = False
# Check last_heard from complete_contact_tracking
last_heard = repeater.get('last_heard')
if last_heard:
try:
if isinstance(last_heard, str):
last_heard_dt = datetime.fromisoformat(last_heard.replace('Z', '+00:00'))
else:
last_heard_dt = last_heard
is_recent = last_heard_dt > cutoff_time
except:
pass
# Check last_advert_timestamp if last_heard check failed
if not is_recent:
last_advert = repeater.get('last_advert_timestamp')
if last_advert:
try:
if isinstance(last_advert, str):
last_advert_dt = datetime.fromisoformat(last_advert.replace('Z', '+00:00'))
else:
last_advert_dt = last_advert
is_recent = last_advert_dt > cutoff_time
except:
pass
# Check last_seen from complete_contact_tracking table
if not is_recent:
last_seen = repeater.get('last_seen')
if last_seen:
try:
if isinstance(last_seen, str):
last_seen_dt = datetime.fromisoformat(last_seen.replace('Z', '+00:00'))
else:
last_seen_dt = last_seen
is_recent = last_seen_dt > cutoff_time
except:
pass
if is_recent:
recent_repeaters.append(repeater)
return recent_repeaters
def _select_repeater_by_graph(self, repeaters: list[dict[str, Any]], node_id: str,
path_context: list[str],
path_prefix_hex_chars: Optional[int] = None) -> tuple[Optional[dict[str, Any]], float, str]:
"""Select a repeater for a colliding prefix using mesh-graph evidence.
Thin wrapper over the shared engine (modules.path_inference.select_repeater_by_graph);
kept as a method because the bot wiring and tests call it directly. When the path was
decoded with multi-byte hops, pass path_prefix_hex_chars (e.g. 4 or 6) for candidate
matching; graph lookups normalize to bot.prefix_hex_chars.
"""
return select_repeater_by_graph(
repeaters, node_id, path_context, self._inference_config(),
mesh_graph=getattr(self.bot, 'mesh_graph', None),
db_manager=self.bot.db_manager,
logger=self.logger,
graph_n=getattr(self.bot, 'prefix_hex_chars', 2),
path_prefix_hex_chars=path_prefix_hex_chars,
)
def _format_path_response(self, node_ids: list[str], repeater_info: dict[str, dict[str, Any]]) -> str:
"""Format the path decode response
Maintains the order of repeaters as they appear in the path (first to last)
"""
# Build response lines in path order (first to last as message traveled)
lines = []
# Process nodes in path order (first to last as message traveled)
for node_id in node_ids:
info = repeater_info.get(node_id, {})
if info.get('found', False):
if info.get('collision', False):
# Multiple repeaters with same prefix
matches = info.get('matches', 0)
line = self.translate('commands.path.node_collision', node_id=node_id, matches=matches)
elif info.get('geographic_guess', False) or info.get('graph_guess', False):
# Geographic or graph-based selection
name = info.get('name', self.translate('commands.path.unknown_name'))
confidence = info.get('confidence', 0.0)
# Truncate name if too long
truncation = self.translate('commands.path.truncation')
name = self._truncate_to_byte_length(name, 20, truncation)
# Add confidence indicator
if confidence >= 0.9:
confidence_indicator = self.high_confidence_symbol
elif confidence >= 0.8:
confidence_indicator = self.medium_confidence_symbol
else:
confidence_indicator = self.low_confidence_symbol
# Use geographic translation key for backward compatibility, or add graph-specific if needed
line = self.translate('commands.path.node_geographic', node_id=node_id, name=name, confidence=confidence_indicator)
else:
# Single repeater found
name = info.get('name', self.translate('commands.path.unknown_name'))
truncation = self.translate('commands.path.truncation')
name = self._truncate_to_byte_length(name, 27, truncation)
line = self.translate('commands.path.node_format', node_id=node_id, name=name)
else:
# Unknown repeater
line = self.translate('commands.path.node_unknown', node_id=node_id)
line = self._truncate_to_byte_length(line, 150)
lines.append(line)
# Return all lines - let _send_path_response handle the splitting
return "\n".join(lines)
async def _send_path_response(self, message: MeshMessage, response: str):
"""Send path response, splitting into multiple messages if necessary"""
prefix = self._format_path_reply_prefix(message)
self.last_response = prefix + response if prefix else response
max_length = self.get_max_message_length(message)
prefix_len = self._count_byte_length(prefix)
first_segment_max = max_length - prefix_len
if first_segment_max < 1:
first_segment_max = 1
if self._count_byte_length(response) + prefix_len <= max_length:
await self.send_response(message, prefix + response)
return
lines = response.split('\n')
current_message = ""
message_count = 0
for i, line in enumerate(lines):
body_budget = first_segment_max if message_count == 0 else max_length
if self._count_byte_length(current_message) + self._count_byte_length(line) + 1 > body_budget:
if current_message:
if i < len(lines):
current_message += self.translate('commands.path.continuation_end')
out = (prefix + current_message.rstrip()) if message_count == 0 else current_message.rstrip()
await self.send_response(
message, out,
skip_user_rate_limit=(message_count > 0)
)
await asyncio.sleep(3.0)
message_count += 1
if message_count > 0:
current_message = self.translate('commands.path.continuation_start', line=line)
else:
current_message = line
else:
if current_message:
current_message += f"\n{line}"
else:
current_message = line
if current_message:
out = (prefix + current_message.rstrip()) if message_count == 0 else current_message.rstrip()
await self.send_response(message, out, skip_user_rate_limit=True)
async def _extract_path_from_recent_messages(self) -> str:
"""Extract path from the current message's path information (same as test command).
Prefers already-extracted routing_info.path_nodes when present (multi-byte path support).
"""
try:
if not hasattr(self, '_current_message') or not self._current_message:
return self.translate('commands.path.no_path')
msg = self._current_message
# Prefer routing_info when present (no re-parsing; preserves bytes_per_hop)
routing_info = getattr(msg, 'routing_info', None)
if routing_info is not None:
path_length = routing_info.get('path_length', 0)
if path_length == 0:
return self.translate('commands.path.direct_connection')
path_nodes = routing_info.get('path_nodes', [])
if path_nodes:
node_ids = [n.upper() for n in path_nodes]
return await self._decode_node_ids(node_ids, routing_info)
# Fallback: parse message.path string (e.g. no routing_info or legacy path)
if not msg.path:
return self.translate('commands.path.no_path')
path_string = msg.path
if "Direct" in path_string or "0 hops" in path_string:
return self.translate('commands.path.direct_connection')
path_part = path_string.split(" via ROUTE_TYPE_")[0] if " via ROUTE_TYPE_" in path_string else path_string
if ',' in path_part:
return await self._decode_path(path_part, routing_info)
hex_pattern = rf'[0-9a-fA-F]{{{getattr(self.bot, "prefix_hex_chars", 2)}}}'
if re.search(hex_pattern, path_part):
return await self._decode_path(path_part, routing_info)
return self.translate('commands.path.path_prefix', path_string=path_string)
except Exception as e:
self.logger.error(f"Error extracting path from current message: {e}")
return self.translate('commands.path.error_extracting', error=str(e))
def _count_byte_length(self, text: str) -> int:
"""Count UTF-8 byte length of text. This matches the RF packet byte limit."""
return len(text.encode('utf-8'))
def _truncate_to_byte_length(self, text: str, max_bytes: int, ellipsis: str = "...") -> str:
"""Truncate text to fit within max UTF-8 byte length, never splitting multi-byte chars."""
text_bytes: bytes = text.encode('utf-8')
if len(text_bytes) <= max_bytes:
return text
ellipsis_bytes: bytes = ellipsis.encode('utf-8')
available: int = max_bytes - len(ellipsis_bytes)
if available <= 0:
return ellipsis
truncated: str = text_bytes[:available].decode('utf-8', errors='ignore')
return truncated + ellipsis
def get_help(self) -> str:
"""Get help text for the path command"""
return self.translate('commands.path.help')
def get_help_text(self) -> str:
"""Get help text for the path command (used by help system)"""
return self.get_help()