diff --git a/modules/commands/path_command.py b/modules/commands/path_command.py index 2c0b52d..d769399 100644 --- a/modules/commands/path_command.py +++ b/modules/commands/path_command.py @@ -4,18 +4,20 @@ Path Decode Command for the MeshCore Bot Decodes hex path data to show which repeaters were involved in message routing """ +import asyncio +import contextlib import re import time -import asyncio -from typing import List, Optional, Dict, Any, Tuple, Callable -from .base_command import BaseCommand +from typing import Any, Callable, Optional + from ..models import MeshMessage from ..utils import calculate_distance, parse_path_string +from .base_command import BaseCommand class PathCommand(BaseCommand): """Command for decoding path data to repeater names""" - + # Plugin metadata name = "path" keywords = ["path", "decode", "route"] @@ -23,39 +25,43 @@ class PathCommand(BaseCommand): requires_dm = False cooldown_seconds = 1 category = "meshcore_info" - + # Documentation short_description = "Decode path data to show repeaters involved in message routing" usage = "path [hex_data]" examples = ["path", "decode"] - + def __init__(self, bot): super().__init__(bot) self.path_enabled = self.get_config_value('Path_Command', 'enabled', fallback=True, value_type='bool') + # Explicit config toggle; set False to disable even when bot lat/lon are configured + self.geographic_scoring_config_enabled = bot.config.getboolean( + 'Path_Command', 'geographic_scoring_enabled', fallback=True + ) # Get bot location from config for geographic proximity calculations # Check if geographic guessing is enabled (bot has location configured) self.geographic_guessing_enabled = False self.bot_latitude = None self.bot_longitude = None - + # Get proximity calculation method from config self.proximity_method = bot.config.get('Path_Command', 'proximity_method', fallback='simple') self.path_proximity_fallback = bot.config.getboolean('Path_Command', 'path_proximity_fallback', fallback=True) self.max_proximity_range = bot.config.getfloat('Path_Command', 'max_proximity_range', fallback=200.0) self.max_repeater_age_days = bot.config.getint('Path_Command', 'max_repeater_age_days', fallback=14) - + # Get recency/proximity weighting (0.0 to 1.0, where 1.0 = 100% recency, 0.0 = 100% proximity) # Default 0.4 means 40% recency, 60% proximity (more balanced for path routing) recency_weight = bot.config.getfloat('Path_Command', 'recency_weight', fallback=0.4) self.recency_weight = max(0.0, min(1.0, recency_weight)) # Clamp to 0.0-1.0 self.proximity_weight = 1.0 - self.recency_weight - + # Get recency decay half-life for longer advert intervals (default: 12 hours, suggested: 36-48 for 48-72 hour intervals) self.recency_decay_half_life_hours = bot.config.getfloat('Path_Command', 'recency_decay_half_life_hours', fallback=12.0) - + # Check for preset first, then apply individual settings (preset can be overridden) preset = bot.config.get('Path_Command', 'path_selection_preset', fallback='balanced').lower() - + # Apply preset defaults, then individual settings override if preset == 'geographic': # Prioritize geographic proximity @@ -75,11 +81,11 @@ class PathCommand(BaseCommand): preset_distance_threshold = 30.0 preset_distance_penalty = 0.3 preset_final_hop_weight = 0.25 - + # Graph-based validation settings self.graph_based_validation = bot.config.getboolean('Path_Command', 'graph_based_validation', fallback=True) self.min_edge_observations = bot.config.getint('Path_Command', 'min_edge_observations', fallback=3) - + # Enhanced graph features self.graph_use_bidirectional = bot.config.getboolean('Path_Command', 'graph_use_bidirectional', fallback=True) self.graph_use_hop_position = bot.config.getboolean('Path_Command', 'graph_use_hop_position', fallback=True) @@ -92,14 +98,14 @@ class PathCommand(BaseCommand): self.graph_confidence_override_threshold = bot.config.getfloat('Path_Command', 'graph_confidence_override_threshold', fallback=preset_graph_confidence_threshold) self.graph_confidence_override_threshold = max(0.0, min(1.0, self.graph_confidence_override_threshold)) # Clamp to 0.0-1.0 self.graph_distance_penalty_enabled = bot.config.getboolean('Path_Command', 'graph_distance_penalty_enabled', fallback=True) - + self.graph_max_reasonable_hop_distance_km = bot.config.getfloat('Path_Command', 'graph_max_reasonable_hop_distance_km', fallback=preset_distance_threshold) self.graph_distance_penalty_strength = bot.config.getfloat('Path_Command', 'graph_distance_penalty_strength', fallback=preset_distance_penalty) self.graph_distance_penalty_strength = max(0.0, min(1.0, self.graph_distance_penalty_strength)) # Clamp to 0.0-1.0 self.graph_zero_hop_bonus = bot.config.getfloat('Path_Command', 'graph_zero_hop_bonus', fallback=0.4) self.graph_zero_hop_bonus = max(0.0, min(1.0, self.graph_zero_hop_bonus)) # Clamp to 0.0-1.0 self.graph_prefer_stored_keys = bot.config.getboolean('Path_Command', 'graph_prefer_stored_keys', fallback=True) - + # Final hop proximity settings for graph selection # Defaults based on LoRa ranges: typical < 30km, long up to 200km, very close < 10km self.graph_final_hop_proximity_enabled = bot.config.getboolean('Path_Command', 'graph_final_hop_proximity_enabled', fallback=True) @@ -114,37 +120,40 @@ class PathCommand(BaseCommand): self.graph_path_validation_max_bonus = bot.config.getfloat('Path_Command', 'graph_path_validation_max_bonus', fallback=0.3) self.graph_path_validation_max_bonus = max(0.0, min(1.0, self.graph_path_validation_max_bonus)) # Clamp to 0.0-1.0 self.graph_path_validation_obs_divisor = bot.config.getfloat('Path_Command', 'graph_path_validation_obs_divisor', fallback=50.0) - + # Get star bias multiplier (how much to boost starred repeaters' scores) # Default 2.5 means starred repeaters get 2.5x their normal score self.star_bias_multiplier = bot.config.getfloat('Path_Command', 'star_bias_multiplier', fallback=2.5) self.star_bias_multiplier = max(1.0, self.star_bias_multiplier) # Ensure at least 1.0 - + # Get confidence indicator symbols from config self.high_confidence_symbol = bot.config.get('Path_Command', 'high_confidence_symbol', fallback='🎯') self.medium_confidence_symbol = bot.config.get('Path_Command', 'medium_confidence_symbol', fallback='📍') self.low_confidence_symbol = bot.config.get('Path_Command', 'low_confidence_symbol', fallback='❓') - + # Check if "p" shortcut is enabled (on by default) self.enable_p_shortcut = bot.config.getboolean('Path_Command', 'enable_p_shortcut', fallback=True) if self.enable_p_shortcut: # Add "p" to keywords if enabled if "p" not in self.keywords: self.keywords.append("p") - + try: # Try to get location from Bot section if bot.config.has_section('Bot'): lat = bot.config.getfloat('Bot', 'bot_latitude', fallback=None) lon = bot.config.getfloat('Bot', 'bot_longitude', fallback=None) - + if lat is not None and lon is not None: # Validate coordinates if -90 <= lat <= 90 and -180 <= lon <= 180: self.bot_latitude = lat self.bot_longitude = lon - self.geographic_guessing_enabled = True - self.logger.info(f"Geographic proximity guessing enabled with bot location: {lat:.4f}, {lon:.4f}") + if self.geographic_scoring_config_enabled: + self.geographic_guessing_enabled = True + self.logger.info(f"Geographic proximity guessing enabled with bot location: {lat:.4f}, {lon:.4f}") + else: + self.logger.info("Geographic proximity guessing disabled via config (geographic_scoring_enabled = false)") self.logger.info(f"Proximity method: {self.proximity_method}") self.logger.info(f"Max repeater age: {self.max_repeater_age_days} days") else: @@ -155,55 +164,51 @@ class PathCommand(BaseCommand): self.logger.info("Bot section not found - geographic proximity guessing disabled") except Exception as e: self.logger.warning(f"Error reading bot location from config: {e} - geographic proximity guessing disabled") - + def can_execute(self, message: MeshMessage) -> bool: """Check if this command can be executed with the given message. - + Args: message: The message triggering the command. - + Returns: bool: True if command is enabled and checks pass, False otherwise. """ if not self.path_enabled: return False return super().can_execute(message) - + def matches_keyword(self, message: MeshMessage) -> bool: """Check if message starts with 'path' keyword or 'p' shortcut (if enabled)""" content = message.content.strip() - + # Handle exclamation prefix if content.startswith('!'): content = content[1:].strip() - + content_lower = content.lower() - + # Handle "p" shortcut if enabled if self.enable_p_shortcut: if content_lower == "p": return True # Just "p" by itself elif (content.startswith('p ') or content.startswith('P ')) and len(content) > 2: return True # "p " followed by path data - + # Check if message starts with any of our keywords - for keyword in self.keywords: - # Check for exact match or keyword followed by space - if content_lower == keyword or content_lower.startswith(keyword + ' '): - return True - return False - + return any(content_lower == keyword or content_lower.startswith(keyword + ' ') for keyword in self.keywords) + async def execute(self, message: MeshMessage) -> bool: """Execute path decode command""" self.logger.info(f"Path command executed with content: {message.content}") - + # Store the current message for use in _extract_path_from_recent_messages self._current_message = message - + # Parse the message content to extract path data content = message.content.strip() parts = content.split() - + if len(parts) < 2: # No arguments provided - try to extract path from current message response = await self._extract_path_from_recent_messages() @@ -211,11 +216,11 @@ class PathCommand(BaseCommand): # Extract path data from the command path_input = " ".join(parts[1:]) response = await self._decode_path(path_input) - + # Send the response (may be split into multiple messages if long) await self._send_path_response(message, response) return True - + async def _decode_path(self, path_input: str) -> str: """Decode hex path data to repeater names. Comma-separated tokens infer hop size (2, 4, or 6 hex chars per node). @@ -253,12 +258,12 @@ class PathCommand(BaseCommand): except Exception as e: self.logger.error(f"Error decoding path: {e}") return self.translate('commands.path.error_decoding', error=str(e)) - + async def _lookup_repeater_names( self, - node_ids: List[str], - lookup_func: Optional[Callable[[str], List[Dict[str, Any]]]] = None, - ) -> Dict[str, Dict[str, Any]]: + node_ids: list[str], + lookup_func: Optional[Callable[[str], list[dict[str, Any]]]] = None, + ) -> dict[str, dict[str, Any]]: """Look up repeater names for given node IDs. Args: @@ -271,7 +276,6 @@ class PathCommand(BaseCommand): try: # Skip API cache for path decoding - use database with improved proximity logic # API cache doesn't have recency-based proximity selection needed for path decoding - api_data = None # Query the database for repeaters with matching prefixes # Node IDs are the configured prefix of the public key (see Bot.prefix_bytes) @@ -311,7 +315,7 @@ class PathCommand(BaseCommand): try: # Get repeater devices from complete database (repeaters and roomservers) complete_db = await self.bot.repeater_manager.get_repeater_devices(include_historical=True) - + for row in complete_db: if row['public_key'].startswith(node_id): results.append({ @@ -337,25 +341,25 @@ class PathCommand(BaseCommand): except Exception as e: self.logger.debug(f"Error getting complete database: {e}") results = [] - + # If complete tracking database failed, try direct query to complete_contact_tracking if not results: try: # Build query with age filtering if configured # Use last_advert_timestamp if available, otherwise fall back to last_heard if self.max_repeater_age_days > 0: - query = ''' + query = f''' SELECT name, public_key, device_type, last_heard, last_heard as last_seen, last_advert_timestamp, latitude, longitude, city, state, country, advert_count, signal_strength, snr, hop_count, role, is_starred FROM complete_contact_tracking WHERE public_key LIKE ? AND role IN ('repeater', 'roomserver') AND ( - (last_advert_timestamp IS NOT NULL AND last_advert_timestamp >= datetime('now', '-{} days')) - OR (last_advert_timestamp IS NULL AND last_heard >= datetime('now', '-{} days')) + (last_advert_timestamp IS NOT NULL AND last_advert_timestamp >= datetime('now', '-{self.max_repeater_age_days} days')) + OR (last_advert_timestamp IS NULL AND last_heard >= datetime('now', '-{self.max_repeater_age_days} days')) ) ORDER BY COALESCE(last_advert_timestamp, last_heard) DESC - '''.format(self.max_repeater_age_days, self.max_repeater_age_days) + ''' else: query = ''' SELECT name, public_key, device_type, last_heard, last_heard as last_seen, @@ -365,10 +369,10 @@ class PathCommand(BaseCommand): WHERE public_key LIKE ? AND role IN ('repeater', 'roomserver') ORDER BY COALESCE(last_advert_timestamp, last_heard) DESC ''' - + prefix_pattern = f"{node_id}%" results = self.bot.db_manager.execute_query(query, (prefix_pattern,)) - + # Convert results to expected format if results: results = [ @@ -396,7 +400,7 @@ class PathCommand(BaseCommand): except Exception as e: self.logger.debug(f"Error querying complete_contact_tracking directly: {e}") results = [] - + if results: # Build repeaters_data with all necessary fields repeaters_data = [ @@ -417,13 +421,13 @@ class PathCommand(BaseCommand): 'is_starred': row.get('is_starred', False) # Include star status for bias } for row in results ] - + # Filter out repeaters with very low recency scores BEFORE collision detection # This prevents old repeaters from causing false collisions scored_repeaters = self._calculate_recency_weighted_scores(repeaters_data) min_recency_threshold = 0.01 # Approximately 55 hours ago or less recent_repeaters = [r for r, score in scored_repeaters if score >= min_recency_threshold] - + # Check for ID collisions (multiple repeaters with same prefix) AFTER filtering if len(recent_repeaters) > 1: # Multiple recent matches - try graph-based validation first, then geographic proximity @@ -434,14 +438,14 @@ class PathCommand(BaseCommand): graph_confidence = 0.0 geo_repeater = None geo_confidence = 0.0 - + # Try graph-based selection if enabled if self.graph_based_validation and hasattr(self.bot, 'mesh_graph') and self.bot.mesh_graph: path_prefix_hex_chars = len(node_id) graph_repeater, graph_confidence, selection_method = self._select_repeater_by_graph( recent_repeaters, node_id, node_ids, path_prefix_hex_chars=path_prefix_hex_chars ) - + # Get geographic proximity selection if self.geographic_guessing_enabled: # Get sender location if available (for first repeater selection) @@ -449,23 +453,23 @@ class PathCommand(BaseCommand): geo_repeater, geo_confidence = self._select_repeater_by_proximity( recent_repeaters, node_id, node_ids, sender_location ) - + # Helper function to check if repeater has valid location data def has_valid_location(repeater): lat = repeater.get('latitude') lon = repeater.get('longitude') - return (lat is not None and lon is not None and + return (lat is not None and lon is not None and not (lat == 0.0 and lon == 0.0)) - + # Check if this is the final hop (last node in path) is_final_hop = (node_id == node_ids[-1] if node_ids else False) - + # Combine or choose between graph and geographic based on config if self.graph_geographic_combined and graph_repeater and geo_repeater: # Only combine if both methods selected the same repeater graph_pubkey = graph_repeater.get('public_key', '') geo_pubkey = geo_repeater.get('public_key', '') - + if graph_pubkey and geo_pubkey and graph_pubkey == geo_pubkey: # Same repeater - combine scores with weighted average combined_confidence = ( @@ -521,7 +525,7 @@ class PathCommand(BaseCommand): selected_repeater = graph_repeater confidence = graph_confidence selection_method = selection_method or 'graph' - + if selected_repeater and confidence >= 0.5: # High confidence selection (graph or geographic) repeater_info[node_id] = { @@ -581,7 +585,7 @@ class PathCommand(BaseCommand): 'is_active': True, 'source': 'device' }) - + if device_matches: if len(device_matches) > 1: # Multiple device matches - show collision warning @@ -610,7 +614,7 @@ class PathCommand(BaseCommand): 'found': False, 'node_id': node_id } - + except Exception as e: self.logger.error(f"Error looking up repeater names: {e}") # Return basic info for all nodes @@ -620,10 +624,10 @@ class PathCommand(BaseCommand): 'node_id': node_id, 'error': str(e) } - + return repeater_info - - async def _get_api_cache_data(self) -> Optional[Dict[str, Dict[str, Any]]]: + + async def _get_api_cache_data(self) -> Optional[dict[str, dict[str, Any]]]: """Get API cache data from the prefix command if available""" try: # Try to get the prefix command instance and its cache data @@ -638,31 +642,31 @@ class PathCommand(BaseCommand): except Exception as e: self.logger.warning(f"Could not get API cache data: {e}") return None - - - def _get_sender_location(self) -> Optional[Tuple[float, float]]: + + + def _get_sender_location(self) -> Optional[tuple[float, float]]: """Get sender location from current message if available""" try: if not hasattr(self, '_current_message') or not self._current_message: return None - + sender_pubkey = self._current_message.sender_pubkey if not sender_pubkey: return None - + # Look up sender location from database (any role, not just repeaters) query = ''' - SELECT latitude, longitude - FROM complete_contact_tracking - WHERE public_key = ? + SELECT latitude, longitude + FROM complete_contact_tracking + WHERE public_key = ? AND latitude IS NOT NULL AND longitude IS NOT NULL AND latitude != 0 AND longitude != 0 ORDER BY COALESCE(last_advert_timestamp, last_heard) DESC LIMIT 1 ''' - + results = self.bot.db_manager.execute_query(query, (sender_pubkey,)) - + if results: row = results[0] return (row['latitude'], row['longitude']) @@ -670,28 +674,28 @@ class PathCommand(BaseCommand): except Exception as e: self.logger.debug(f"Error getting sender location: {e}") return None - - def _select_repeater_by_proximity(self, repeaters: List[Dict[str, Any]], node_id: str = None, path_context: List[str] = None, sender_location: Optional[Tuple[float, float]] = None) -> Tuple[Optional[Dict[str, Any]], float]: + + def _select_repeater_by_proximity(self, repeaters: list[dict[str, Any]], node_id: str = None, path_context: list[str] = None, sender_location: Optional[tuple[float, float]] = None) -> tuple[Optional[dict[str, Any]], float]: """ Select the most likely repeater based on geographic proximity. - + Args: repeaters: List of repeaters to choose from node_id: The current node ID being processed path_context: Full path for context (for path proximity method) sender_location: Optional sender location (for first repeater selection) - + Returns: Tuple of (selected_repeater, confidence_score) confidence_score: 0.0 to 1.0, where 1.0 is very confident """ if not repeaters: return None, 0.0 - + # Check if geographic guessing is enabled if not self.geographic_guessing_enabled: return None, 0.0 - + # Filter repeaters that have location data repeaters_with_location = [] for repeater in repeaters: @@ -701,11 +705,11 @@ class PathCommand(BaseCommand): # Skip 0,0 coordinates (hidden location) if not (lat == 0.0 and lon == 0.0): repeaters_with_location.append(repeater) - + # If no repeaters have location data, we can't make a geographic guess if not repeaters_with_location: return None, 0.0 - + # Choose proximity calculation method if self.proximity_method == 'path' and path_context and node_id: result = self._select_by_path_proximity(repeaters_with_location, node_id, path_context, sender_location) @@ -718,21 +722,21 @@ class PathCommand(BaseCommand): return None, 0.0 else: return self._select_by_simple_proximity(repeaters_with_location) - - def _select_by_simple_proximity(self, repeaters_with_location: List[Dict[str, Any]]) -> Tuple[Optional[Dict[str, Any]], float]: + + def _select_by_simple_proximity(self, repeaters_with_location: list[dict[str, Any]]) -> tuple[Optional[dict[str, Any]], float]: """Select repeater based on proximity to bot location with strong recency bias""" # Calculate recency-weighted scores for all repeaters scored_repeaters = self._calculate_recency_weighted_scores(repeaters_with_location) - + # Filter out repeaters with very low recency scores (too old to be considered) # Minimum recency score threshold: 0.01 (approximately 55 hours ago or less) # This prevents selecting repeaters that haven't advertised in several days min_recency_threshold = 0.01 scored_repeaters = [(r, score) for r, score in scored_repeaters if score >= min_recency_threshold] - + if not scored_repeaters: return None, 0.0 # No recent repeaters found - + # If only one repeater, check if it's within range if len(scored_repeaters) == 1: repeater, recency_score = scored_repeaters[0] @@ -743,11 +747,11 @@ class PathCommand(BaseCommand): # Apply maximum range threshold if self.max_proximity_range > 0 and distance > self.max_proximity_range: return None, 0.0 # Reject if beyond maximum range - + # Confidence based on recency score base_confidence = 0.4 + (recency_score * 0.5) # 0.4 to 0.9 based on recency return repeater, base_confidence - + # Calculate combined proximity + recency scores combined_scores = [] for repeater, recency_score in scored_repeaters: @@ -755,24 +759,24 @@ class PathCommand(BaseCommand): self.bot_latitude, self.bot_longitude, repeater['latitude'], repeater['longitude'] ) - + # Apply maximum range threshold if self.max_proximity_range > 0 and distance > self.max_proximity_range: continue # Skip if beyond maximum range - + # Combined score: proximity (lower is better) + recency (higher is better) # Normalize distance to 0-1 scale (assuming max 1000km range) normalized_distance = min(distance / 1000.0, 1.0) proximity_score = 1.0 - normalized_distance # Invert so closer = higher score - + # Use configurable weighting (default: 40% recency, 60% proximity) combined_score = (recency_score * self.recency_weight) + (proximity_score * self.proximity_weight) - + # Apply star bias multiplier if repeater is starred if repeater.get('is_starred', False): combined_score *= self.star_bias_multiplier self.logger.debug(f"Applied star bias ({self.star_bias_multiplier}x) to {repeater.get('name', 'unknown')}") - + # SNR bonus: If repeater has SNR data, it's a zero-hop repeater (direct neighbor) # This is strong evidence it's close and should be preferred snr = repeater.get('snr') @@ -781,24 +785,24 @@ class PathCommand(BaseCommand): snr_bonus = combined_score * 0.2 combined_score += snr_bonus self.logger.debug(f"SNR bonus for {repeater.get('name', 'unknown')}: +{snr_bonus:.3f} (has SNR data, confirmed zero-hop)") - + combined_scores.append((combined_score, distance, repeater)) - + if not combined_scores: return None, 0.0 # All repeaters beyond range - + # Sort by combined score (highest first) combined_scores.sort(key=lambda x: x[0], reverse=True) - + best_score, best_distance, best_repeater = combined_scores[0] - + # Calculate confidence based on score difference if len(combined_scores) == 1: confidence = 0.4 + (best_score * 0.5) # 0.4 to 0.9 based on score else: second_best_score = combined_scores[1][0] score_ratio = best_score / second_best_score if second_best_score > 0 else 1.0 - + # Higher confidence if there's a significant score difference if score_ratio > 1.5: # Best is 50% better than second confidence = 0.9 @@ -812,20 +816,20 @@ class PathCommand(BaseCommand): selected_repeater = self._apply_tie_breakers(distances_for_tiebreaker) confidence = 0.5 # Moderate confidence for tie-breaker selection return selected_repeater, confidence - + return best_repeater, confidence - - def _calculate_recency_weighted_scores(self, repeaters: List[Dict[str, Any]]) -> List[Tuple[Dict[str, Any], float]]: + + def _calculate_recency_weighted_scores(self, repeaters: list[dict[str, Any]]) -> list[tuple[dict[str, Any], float]]: """Calculate recency-weighted scores for all repeaters (0.0 to 1.0, higher = more recent)""" - from datetime import datetime, timedelta - + from datetime import datetime + scored_repeaters = [] now = datetime.now() - + for repeater in repeaters: # Get the most recent timestamp from multiple fields most_recent_time = None - + # Check last_heard from complete_contact_tracking last_heard = repeater.get('last_heard') if last_heard: @@ -838,7 +842,7 @@ class PathCommand(BaseCommand): most_recent_time = dt except: pass - + # Check last_advert_timestamp last_advert = repeater.get('last_advert_timestamp') if last_advert: @@ -851,7 +855,7 @@ class PathCommand(BaseCommand): most_recent_time = dt except: pass - + # Check last_seen from complete_contact_tracking table last_seen = repeater.get('last_seen') if last_seen: @@ -864,14 +868,14 @@ class PathCommand(BaseCommand): most_recent_time = dt except: pass - + if most_recent_time is None: # No timestamp found, give very low score recency_score = 0.1 else: # Calculate recency score using exponential decay hours_ago = (now - most_recent_time).total_seconds() / 3600.0 - + # Strong recency bias: recent devices get high scores, older devices get exponentially lower scores # Score = e^(-hours/half_life) - configurable half-life for longer advert intervals # With default 12-hour half-life: @@ -886,28 +890,28 @@ class PathCommand(BaseCommand): # - 72 hours ago: ~0.14 import math recency_score = math.exp(-hours_ago / self.recency_decay_half_life_hours) - + # Ensure score is between 0.0 and 1.0 recency_score = max(0.0, min(1.0, recency_score)) - + scored_repeaters.append((repeater, recency_score)) - + # Sort by recency score (highest first) scored_repeaters.sort(key=lambda x: x[1], reverse=True) - + return scored_repeaters - - def _filter_recent_repeaters(self, repeaters: List[Dict[str, Any]], cutoff_hours: int = 24) -> List[Dict[str, Any]]: + + def _filter_recent_repeaters(self, repeaters: list[dict[str, Any]], cutoff_hours: int = 24) -> list[dict[str, Any]]: """Filter repeaters to only include those that have advertised recently""" from datetime import datetime, timedelta - + recent_repeaters = [] cutoff_time = datetime.now() - timedelta(hours=cutoff_hours) - + for repeater in repeaters: # Check recency using multiple timestamp fields is_recent = False - + # Check last_heard from complete_contact_tracking last_heard = repeater.get('last_heard') if last_heard: @@ -919,7 +923,7 @@ class PathCommand(BaseCommand): is_recent = last_heard_dt > cutoff_time except: pass - + # Check last_advert_timestamp if last_heard check failed if not is_recent: last_advert = repeater.get('last_advert_timestamp') @@ -932,7 +936,7 @@ class PathCommand(BaseCommand): is_recent = last_advert_dt > cutoff_time except: pass - + # Check last_seen from complete_contact_tracking table if not is_recent: last_seen = repeater.get('last_seen') @@ -945,32 +949,32 @@ class PathCommand(BaseCommand): is_recent = last_seen_dt > cutoff_time except: pass - + if is_recent: recent_repeaters.append(repeater) - + return recent_repeaters - def _apply_tie_breakers(self, distances: List[Tuple[float, Dict[str, Any]]]) -> Dict[str, Any]: + def _apply_tie_breakers(self, distances: list[tuple[float, dict[str, Any]]]) -> dict[str, Any]: """Apply tie-breaker strategies when repeaters have identical coordinates""" from datetime import datetime - + # Get all repeaters with the same (minimum) distance min_distance = distances[0][0] tied_repeaters = [repeater for distance, repeater in distances if distance == min_distance] - + # Tie-breaker 1: Prefer active repeaters active_repeaters = [r for r in tied_repeaters if r.get('is_active', True)] if len(active_repeaters) == 1: return active_repeaters[0] elif len(active_repeaters) > 1: tied_repeaters = active_repeaters - + # Tie-breaker 2: Prefer repeaters with more recent activity (enhanced recency check) def get_recent_timestamp(repeater): """Get the most recent timestamp from multiple fields""" timestamps = [] - + # Check last_heard from complete_contact_tracking last_heard = repeater.get('last_heard') if last_heard: @@ -982,7 +986,7 @@ class PathCommand(BaseCommand): timestamps.append(dt) except: pass - + # Check last_advert_timestamp last_advert = repeater.get('last_advert_timestamp') if last_advert: @@ -994,7 +998,7 @@ class PathCommand(BaseCommand): timestamps.append(dt) except: pass - + # Check last_seen from complete_contact_tracking table last_seen = repeater.get('last_seen') if last_seen: @@ -1006,60 +1010,58 @@ class PathCommand(BaseCommand): timestamps.append(dt) except: pass - + # Return the most recent timestamp, or epoch if none found if timestamps: return max(timestamps) else: return datetime.min # Use epoch as fallback - + try: # Sort by most recent activity (more recent first) tied_repeaters.sort(key=get_recent_timestamp, reverse=True) except: pass # If sorting fails, continue with next tie-breaker - + # Tie-breaker 3: Prefer repeaters with higher advertisement count (more active) - try: + with contextlib.suppress(BaseException): tied_repeaters.sort(key=lambda r: r.get('advert_count', 0), reverse=True) - except: - pass - + # Tie-breaker 4: Alphabetical order (deterministic) tied_repeaters.sort(key=lambda r: r.get('name', '')) - + return tied_repeaters[0] - - def _select_by_path_proximity(self, repeaters_with_location: List[Dict[str, Any]], node_id: str, path_context: List[str], sender_location: Optional[Tuple[float, float]] = None) -> Tuple[Optional[Dict[str, Any]], float]: + + def _select_by_path_proximity(self, repeaters_with_location: list[dict[str, Any]], node_id: str, path_context: list[str], sender_location: Optional[tuple[float, float]] = None) -> tuple[Optional[dict[str, Any]], float]: """Select repeater based on proximity to previous/next nodes in path""" try: # Filter out repeaters with very low recency scores first scored_repeaters = self._calculate_recency_weighted_scores(repeaters_with_location) min_recency_threshold = 0.01 # Approximately 55 hours ago or less recent_repeaters = [r for r, score in scored_repeaters if score >= min_recency_threshold] - + if not recent_repeaters: return None, 0.0 # No recent repeaters found - + # Find current node position in path current_index = path_context.index(node_id) if node_id in path_context else -1 if current_index == -1: return None, 0.0 - + # Get previous and next node locations prev_location = None next_location = None - + # Get previous node location if current_index > 0: prev_node_id = path_context[current_index - 1] prev_location = self._get_node_location(prev_node_id) - - # Get next node location + + # Get next node location if current_index < len(path_context) - 1: next_node_id = path_context[current_index + 1] next_location = self._get_node_location(next_node_id) - + # For the first repeater in the path, prioritize sender location as the source # The first repeater's primary job is to receive from the sender, so use sender location if available is_first_repeater = (current_index == 0) @@ -1067,7 +1069,7 @@ class PathCommand(BaseCommand): # For first repeater, use sender location only (not averaged with next node) self.logger.debug(f"Using sender location for proximity calculation of first repeater: {sender_location[0]:.4f}, {sender_location[1]:.4f}") return self._select_by_single_proximity(recent_repeaters, sender_location, "sender") - + # For the last repeater in the path, prioritize bot location as the destination # The last repeater's primary job is to deliver to the bot, so use bot location only is_last_repeater = (current_index == len(path_context) - 1) @@ -1077,7 +1079,7 @@ class PathCommand(BaseCommand): bot_location = (self.bot_latitude, self.bot_longitude) self.logger.debug(f"Using bot location for proximity calculation of last repeater: {self.bot_latitude:.4f}, {self.bot_longitude:.4f}") return self._select_by_single_proximity(recent_repeaters, bot_location, "bot") - + # For non-first/non-last repeaters, use both previous and next locations if available # If we have both previous and next locations, use both for proximity if prev_location and next_location: @@ -1088,40 +1090,40 @@ class PathCommand(BaseCommand): return self._select_by_single_proximity(recent_repeaters, next_location, "next") else: return None, 0.0 - + except Exception as e: self.logger.warning(f"Error in path proximity calculation: {e}") return None, 0.0 - - def _get_node_location(self, node_id: str) -> Optional[Tuple[float, float]]: + + def _get_node_location(self, node_id: str) -> Optional[tuple[float, float]]: """Get location for a node ID from the complete_contact_tracking database""" try: # Build query with age filtering if configured # Use last_advert_timestamp if available, otherwise fall back to last_heard if self.max_repeater_age_days > 0: - query = ''' - SELECT latitude, longitude, is_starred FROM complete_contact_tracking + query = f''' + SELECT latitude, longitude, is_starred FROM complete_contact_tracking WHERE public_key LIKE ? AND latitude IS NOT NULL AND longitude IS NOT NULL AND latitude != 0 AND longitude != 0 AND role IN ('repeater', 'roomserver') AND ( - (last_advert_timestamp IS NOT NULL AND last_advert_timestamp >= datetime('now', '-{} days')) - OR (last_advert_timestamp IS NULL AND last_heard >= datetime('now', '-{} days')) + (last_advert_timestamp IS NOT NULL AND last_advert_timestamp >= datetime('now', '-{self.max_repeater_age_days} days')) + OR (last_advert_timestamp IS NULL AND last_heard >= datetime('now', '-{self.max_repeater_age_days} days')) ) ORDER BY is_starred DESC, COALESCE(last_advert_timestamp, last_heard) DESC LIMIT 1 - '''.format(self.max_repeater_age_days, self.max_repeater_age_days) + ''' else: query = ''' - SELECT latitude, longitude, is_starred FROM complete_contact_tracking + SELECT latitude, longitude, is_starred FROM complete_contact_tracking WHERE public_key LIKE ? AND latitude IS NOT NULL AND longitude IS NOT NULL AND latitude != 0 AND longitude != 0 AND role IN ('repeater', 'roomserver') ORDER BY is_starred DESC, COALESCE(last_advert_timestamp, last_heard) DESC LIMIT 1 ''' - + prefix_pattern = f"{node_id}%" results = self.bot.db_manager.execute_query(query, (prefix_pattern,)) - + if results: row = results[0] return (row['latitude'], row['longitude']) @@ -1129,48 +1131,48 @@ class PathCommand(BaseCommand): except Exception as e: self.logger.warning(f"Error getting location for node {node_id}: {e}") return None - - def _select_by_dual_proximity(self, repeaters: List[Dict[str, Any]], prev_location: Tuple[float, float], next_location: Tuple[float, float]) -> Tuple[Optional[Dict[str, Any]], float]: + + def _select_by_dual_proximity(self, repeaters: list[dict[str, Any]], prev_location: tuple[float, float], next_location: tuple[float, float]) -> tuple[Optional[dict[str, Any]], float]: """Select repeater based on proximity to both previous and next nodes with strong recency bias""" # Calculate recency-weighted scores for all repeaters scored_repeaters = self._calculate_recency_weighted_scores(repeaters) - + # Filter out repeaters with very low recency scores min_recency_threshold = 0.01 # Approximately 55 hours ago or less scored_repeaters = [(r, score) for r, score in scored_repeaters if score >= min_recency_threshold] - + if not scored_repeaters: return None, 0.0 # No recent repeaters found - + best_repeater = None best_combined_score = 0.0 - + for repeater, recency_score in scored_repeaters: # Calculate distance to previous node prev_distance = calculate_distance( prev_location[0], prev_location[1], repeater['latitude'], repeater['longitude'] ) - + # Calculate distance to next node next_distance = calculate_distance( next_location[0], next_location[1], repeater['latitude'], repeater['longitude'] ) - + # Combined proximity score (lower distance = higher score) avg_distance = (prev_distance + next_distance) / 2 normalized_distance = min(avg_distance / 1000.0, 1.0) proximity_score = 1.0 - normalized_distance - + # Use configurable weighting (default: 40% recency, 60% proximity) combined_score = (recency_score * self.recency_weight) + (proximity_score * self.proximity_weight) - + # Apply star bias multiplier if repeater is starred if repeater.get('is_starred', False): combined_score *= self.star_bias_multiplier self.logger.debug(f"Applied star bias ({self.star_bias_multiplier}x) to {repeater.get('name', 'unknown')}") - + # SNR bonus: If repeater has SNR data, it's a zero-hop repeater (direct neighbor) # This is strong evidence it's close and should be preferred snr = repeater.get('snr') @@ -1179,11 +1181,11 @@ class PathCommand(BaseCommand): snr_bonus = combined_score * 0.2 combined_score += snr_bonus self.logger.debug(f"SNR bonus for {repeater.get('name', 'unknown')}: +{snr_bonus:.3f} (has SNR data, confirmed zero-hop)") - + if combined_score > best_combined_score: best_combined_score = combined_score best_repeater = repeater - + if best_repeater: # Apply maximum range threshold if self.max_proximity_range > 0: @@ -1198,25 +1200,25 @@ class PathCommand(BaseCommand): ) if prev_dist > self.max_proximity_range or next_dist > self.max_proximity_range: return None, 0.0 # Reject if beyond maximum range - + # Confidence based on combined score confidence = 0.4 + (best_combined_score * 0.5) # 0.4 to 0.9 based on score return best_repeater, confidence - + return None, 0.0 - - def _select_by_single_proximity(self, repeaters: List[Dict[str, Any]], reference_location: Tuple[float, float], direction: str) -> Tuple[Optional[Dict[str, Any]], float]: + + def _select_by_single_proximity(self, repeaters: list[dict[str, Any]], reference_location: tuple[float, float], direction: str) -> tuple[Optional[dict[str, Any]], float]: """Select repeater based on proximity to single reference node with strong recency bias""" # Calculate recency-weighted scores for all repeaters scored_repeaters = self._calculate_recency_weighted_scores(repeaters) - + # Filter out repeaters with very low recency scores min_recency_threshold = 0.01 # Approximately 55 hours ago or less scored_repeaters = [(r, score) for r, score in scored_repeaters if score >= min_recency_threshold] - + if not scored_repeaters: return None, 0.0 # No recent repeaters found - + # For last repeater (direction="bot") or first repeater (direction="sender"), use 100% proximity (0% recency) # The final hop to the bot and first hop from sender should prioritize distance above all else # Recency still matters for filtering (min_recency_threshold), but not for scoring @@ -1227,33 +1229,33 @@ class PathCommand(BaseCommand): # Use configurable weighting for other cases (from config: recency_weight, proximity_weight) proximity_weight = self.proximity_weight recency_weight = self.recency_weight - + best_repeater = None best_combined_score = 0.0 all_scores = [] # For debug logging - + for repeater, recency_score in scored_repeaters: distance = calculate_distance( reference_location[0], reference_location[1], repeater['latitude'], repeater['longitude'] ) - + # Apply maximum range threshold if self.max_proximity_range > 0 and distance > self.max_proximity_range: continue # Skip if beyond maximum range - + # Proximity score (closer = higher score) normalized_distance = min(distance / 1000.0, 1.0) proximity_score = 1.0 - normalized_distance - + # Use appropriate weighting based on direction combined_score = (recency_score * recency_weight) + (proximity_score * proximity_weight) - + # Apply star bias multiplier if repeater is starred if repeater.get('is_starred', False): combined_score *= self.star_bias_multiplier self.logger.debug(f"Applied star bias ({self.star_bias_multiplier}x) to {repeater.get('name', 'unknown')}") - + # SNR bonus: If repeater has SNR data, it's a zero-hop repeater (direct neighbor) # This is strong evidence it's close and should be preferred snr = repeater.get('snr') @@ -1262,29 +1264,29 @@ class PathCommand(BaseCommand): snr_bonus = combined_score * 0.2 combined_score += snr_bonus self.logger.debug(f"SNR bonus for {repeater.get('name', 'unknown')}: +{snr_bonus:.3f} (has SNR data, confirmed zero-hop)") - + all_scores.append((repeater.get('name', 'unknown'), distance, recency_score, proximity_score, combined_score)) - + if combined_score > best_combined_score: best_combined_score = combined_score best_repeater = repeater - + # Debug logging for last repeater selection if direction == "bot" and all_scores: self.logger.debug(f"Last repeater selection scores (proximity_weight={proximity_weight:.1%}, recency_weight={recency_weight:.1%}):") for name, dist, rec, prox, combined in sorted(all_scores, key=lambda x: x[4], reverse=True): self.logger.debug(f" {name}: distance={dist:.1f}km, recency={rec:.3f}, proximity={prox:.3f}, combined={combined:.3f}") - + if best_repeater: # Confidence based on combined score confidence = 0.4 + (best_combined_score * 0.5) # 0.4 to 0.9 based on score return best_repeater, confidence - + return None, 0.0 - - def _select_repeater_by_graph(self, repeaters: List[Dict[str, Any]], node_id: str, - path_context: List[str], - path_prefix_hex_chars: Optional[int] = None) -> Tuple[Optional[Dict[str, Any]], float, str]: + + def _select_repeater_by_graph(self, repeaters: list[dict[str, Any]], node_id: str, + path_context: list[str], + path_prefix_hex_chars: Optional[int] = None) -> tuple[Optional[dict[str, Any]], float, str]: """Select repeater based on graph evidence. Uses enhanced direct-edge validation and multi-hop path inference. @@ -1314,7 +1316,7 @@ class PathCommand(BaseCommand): graph_n = 2 # When path has longer node IDs (multi-byte), use that for candidate prefix; normalize for graph prefix_n = path_prefix_hex_chars if path_prefix_hex_chars is not None and path_prefix_hex_chars >= 2 else graph_n - + # Find current node position in path try: current_index = path_context.index(node_id) if node_id in path_context else -1 @@ -1372,7 +1374,7 @@ class PathCommand(BaseCommand): if stored_from_key and stored_from_key == candidate_public_key: stored_key_bonus = max(stored_key_bonus, 0.4) # Strong bonus for matching stored key self.logger.debug(f"Found stored public key match for {repeater.get('name', 'unknown')} in edge {candidate_norm}->{next_norm}") - + # Zero-hop bonus: If this repeater has been heard directly by the bot (zero-hop advert), # it's strong evidence it's close and should be preferred, even for intermediate hops. # Only apply when graph_score > 0 (we have graph evidence); otherwise zero-hop alone @@ -1383,7 +1385,7 @@ class PathCommand(BaseCommand): # This repeater has been heard directly - strong evidence it's close to bot zero_hop_bonus = self.graph_zero_hop_bonus self.logger.debug(f"Zero-hop bonus for {repeater.get('name', 'unknown')}: {zero_hop_bonus:.2%} (heard directly by bot)") - + # SNR bonus: If this repeater has SNR data, it's a zero-hop repeater (direct neighbor) # This is even stronger evidence than just hop_count == 0, as it means we have actual signal quality data. # Only apply when graph_score > 0 (same rationale as zero_hop_bonus). @@ -1394,10 +1396,10 @@ class PathCommand(BaseCommand): # Use same bonus as zero-hop, but this is more definitive snr_bonus = self.graph_zero_hop_bonus * 1.2 # 20% stronger than zero-hop bonus alone self.logger.debug(f"SNR bonus for {repeater.get('name', 'unknown')}: {snr_bonus:.2%} (has SNR data, confirmed zero-hop)") - + # Add stored key bonus, zero-hop bonus, and SNR bonus to graph score graph_score_with_bonus = min(1.0, graph_score + stored_key_bonus + zero_hop_bonus + snr_bonus) - + # Path validation bonus: Check if candidate's stored paths match the current path context # This helps resolve prefix collisions by matching path patterns # For prefix collision resolution: if multiple repeaters share the same prefix, @@ -1414,7 +1416,7 @@ class PathCommand(BaseCommand): LIMIT 10 ''' stored_paths = self.bot.db_manager.execute_query(query, (candidate_public_key,)) - + if stored_paths: # Build the path we're decoding (full path context) decoded_path_hex = ''.join([node.lower() for node in path_context]) @@ -1440,7 +1442,7 @@ class PathCommand(BaseCommand): decoded_nodes = [decoded_path_hex[i:i+path_n] for i in range(0, len(decoded_path_hex), path_n)] if (len(decoded_path_hex) % path_n) != 0: decoded_nodes = [decoded_path_hex[i:i+2] for i in range(0, len(decoded_path_hex), 2)] - + # Count how many nodes appear in both paths (in order) common_segments = 0 min_len = min(len(stored_nodes), len(decoded_nodes)) @@ -1449,7 +1451,7 @@ class PathCommand(BaseCommand): common_segments += 1 else: break - + # Bonus based on common segments and observation count if common_segments >= 2: # At least 2 common segments - significant match @@ -1463,10 +1465,10 @@ class PathCommand(BaseCommand): break # Strong match found except Exception as e: self.logger.debug(f"Error checking path validation for {candidate_prefix}: {e}") - + # Add path validation bonus to graph score graph_score_with_bonus = min(1.0, graph_score_with_bonus + path_validation_bonus) - + # Second attempt: Multi-hop inference if direct edges have low confidence multi_hop_score = 0.0 if self.graph_multi_hop_enabled and graph_score_with_bonus < 0.6 and prev_norm and next_norm: @@ -1481,11 +1483,11 @@ class PathCommand(BaseCommand): if intermediate_prefix == candidate_norm: multi_hop_score = intermediate_score break - + # Use the best score (direct edge with bonus or multi-hop) candidate_score = max(graph_score_with_bonus, multi_hop_score) method = 'graph_multihop' if multi_hop_score > graph_score_with_bonus else 'graph' - + # Apply distance penalty for intermediate hops (prevents selecting very distant repeaters) # This is especially important when graph has strong evidence for long-distance links if self.graph_distance_penalty_enabled and next_norm is not None: # Not final hop @@ -1515,7 +1517,7 @@ class PathCommand(BaseCommand): if candidate_to_next_edge and candidate_to_next_edge.get('geographic_distance'): distance = candidate_to_next_edge.get('geographic_distance') max_distance = max(max_distance, distance) - + # Apply penalty if distance exceeds reasonable hop distance if max_distance > self.graph_max_reasonable_hop_distance_km: # Calculate penalty: stronger penalty for longer distances @@ -1532,24 +1534,24 @@ class PathCommand(BaseCommand): if max_distance > self.graph_max_reasonable_hop_distance_km * 0.8: # 80% of threshold small_penalty = (max_distance - self.graph_max_reasonable_hop_distance_km * 0.8) / (self.graph_max_reasonable_hop_distance_km * 0.2) * self.graph_distance_penalty_strength * 0.5 candidate_score = candidate_score * (1.0 - small_penalty) - + # For final hop (next_norm is None), add bot location proximity bonus if next_norm is None and self.graph_final_hop_proximity_enabled: if self.bot_latitude is not None and self.bot_longitude is not None: repeater_lat = repeater.get('latitude') repeater_lon = repeater.get('longitude') - + # Check if repeater has valid location data (not 0,0) - has_valid_location = (repeater_lat is not None and repeater_lon is not None and + has_valid_location = (repeater_lat is not None and repeater_lon is not None and not (repeater_lat == 0.0 and repeater_lon == 0.0)) - + if has_valid_location: # Calculate distance to bot distance = calculate_distance( self.bot_latitude, self.bot_longitude, repeater_lat, repeater_lon ) - + # Apply max distance threshold if configured if self.graph_final_hop_max_distance > 0 and distance > self.graph_final_hop_max_distance: # Beyond max distance - skip proximity bonus @@ -1559,7 +1561,7 @@ class PathCommand(BaseCommand): # Use configurable normalization distance (default 500km for more aggressive scoring) normalized_distance = min(distance / self.graph_final_hop_proximity_normalization_km, 1.0) proximity_score = 1.0 - normalized_distance - + # For final hop, use a higher effective weight to ensure proximity matters more # The configured weight is a minimum; we boost it for very close repeaters effective_weight = self.graph_final_hop_proximity_weight @@ -1569,10 +1571,10 @@ class PathCommand(BaseCommand): elif distance < self.graph_final_hop_close_threshold_km: # Close - moderate boost effective_weight = min(0.5, self.graph_final_hop_proximity_weight * 1.5) - + # Combine with graph score using effective weight candidate_score = candidate_score * (1.0 - effective_weight) + proximity_score * effective_weight - + self.logger.debug(f"Final hop proximity for {repeater.get('name', 'unknown')}: distance={distance:.1f}km, proximity_score={proximity_score:.3f}, effective_weight={effective_weight:.3f}, combined_score={candidate_score:.3f}") else: # Repeater without valid location data - apply significant penalty for final hop @@ -1581,7 +1583,7 @@ class PathCommand(BaseCommand): location_penalty = 0.5 candidate_score = candidate_score * (1.0 - location_penalty) self.logger.debug(f"Final hop candidate {repeater.get('name', 'unknown')} has no valid location data - applying {location_penalty:.0%} penalty (score: {candidate_score:.3f})") - + # Apply star bias multiplier if repeater is starred # Starred repeaters should get significant advantage in graph selection is_starred = repeater.get('is_starred', False) @@ -1591,33 +1593,33 @@ class PathCommand(BaseCommand): # Cap at 1.0 but allow it to exceed temporarily for comparison # We'll normalize later when converting to confidence self.logger.debug(f"Applied star bias ({self.star_bias_multiplier}x) to {repeater.get('name', 'unknown')} in graph selection (score: {candidate_score:.3f})") - + if candidate_score > best_score: best_score = candidate_score best_repeater = repeater best_method = method - + if best_repeater and best_score > 0.0: # Convert graph score to confidence (graph scores are already 0.0-1.0) # If star bias was applied, the score may exceed 1.0, so cap it appropriately # Higher scores from star bias indicate stronger preference confidence = min(1.0, best_score) if best_score <= 1.0 else 0.95 + (min(0.05, (best_score - 1.0) / self.star_bias_multiplier)) return best_repeater, confidence, best_method or 'graph' - + return None, 0.0, None - - def _format_path_response(self, node_ids: List[str], repeater_info: Dict[str, Dict[str, Any]]) -> str: + + def _format_path_response(self, node_ids: list[str], repeater_info: dict[str, dict[str, Any]]) -> str: """Format the path decode response - + Maintains the order of repeaters as they appear in the path (first to last) """ # Build response lines in path order (first to last as message traveled) lines = [] - + # Process nodes in path order (first to last as message traveled) for node_id in node_ids: info = repeater_info.get(node_id, {}) - + if info.get('found', False): if info.get('collision', False): # Multiple repeaters with same prefix @@ -1627,13 +1629,13 @@ class PathCommand(BaseCommand): # Geographic or graph-based selection name = info.get('name', self.translate('commands.path.unknown_name')) confidence = info.get('confidence', 0.0) - is_graph = info.get('graph_guess', False) - + info.get('graph_guess', False) + # Truncate name if too long truncation = self.translate('commands.path.truncation') if len(name) > 20: name = name[:17] + truncation - + # Add confidence indicator if confidence >= 0.9: confidence_indicator = self.high_confidence_symbol @@ -1641,43 +1643,43 @@ class PathCommand(BaseCommand): confidence_indicator = self.medium_confidence_symbol else: confidence_indicator = self.low_confidence_symbol - + # Use geographic translation key for backward compatibility, or add graph-specific if needed line = self.translate('commands.path.node_geographic', node_id=node_id, name=name, confidence=confidence_indicator) else: # Single repeater found name = info.get('name', self.translate('commands.path.unknown_name')) - + # Truncate name if too long truncation = self.translate('commands.path.truncation') if len(name) > 27: name = name[:24] + truncation - + line = self.translate('commands.path.node_format', node_id=node_id, name=name) else: # Unknown repeater line = self.translate('commands.path.node_unknown', node_id=node_id) - + # Ensure line fits within 130 character limit if len(line) > 130: truncation = self.translate('commands.path.truncation') line = line[:127] + truncation - + lines.append(line) - + # Return all lines - let _send_path_response handle the splitting return "\n".join(lines) - + async def _send_path_response(self, message: MeshMessage, response: str): """Send path response, splitting into multiple messages if necessary""" # Store the complete response for web viewer integration BEFORE splitting # command_manager will prioritize command.last_response over _last_response # This ensures capture_command gets the full response, not just the last split message self.last_response = response - + # Get dynamic max message length based on message type and bot username max_length = self.get_max_message_length(message) - + if len(response) <= max_length: # Single message is fine await self.send_response(message, response) @@ -1687,7 +1689,7 @@ class PathCommand(BaseCommand): lines = response.split('\n') current_message = "" message_count = 0 - + for i, line in enumerate(lines): # Check if adding this line would exceed max_length characters if len(current_message) + len(line) + 1 > max_length: # +1 for newline @@ -1703,7 +1705,7 @@ class PathCommand(BaseCommand): ) await asyncio.sleep(3.0) # Delay between messages (same as other commands) message_count += 1 - + # Start new message with ellipsis on new line at beginning (if not first message) if message_count > 0: current_message = self.translate('commands.path.continuation_start', line=line) @@ -1715,11 +1717,11 @@ class PathCommand(BaseCommand): current_message += f"\n{line}" else: current_message = line - + # Send the last message if there's content (continuation; skip per-user rate limit) if current_message: await self.send_response(message, current_message, skip_user_rate_limit=True) - + async def _extract_path_from_recent_messages(self) -> str: """Extract path from the current message's path information (same as test command). Prefers already-extracted routing_info.path_nodes when present (multi-byte path support). @@ -1751,10 +1753,7 @@ class PathCommand(BaseCommand): if "Direct" in path_string or "0 hops" in path_string: return self.translate('commands.path.direct_connection') - if " via ROUTE_TYPE_" in path_string: - path_part = path_string.split(" via ROUTE_TYPE_")[0] - else: - path_part = path_string + path_part = path_string.split(" via ROUTE_TYPE_")[0] if " via ROUTE_TYPE_" in path_string else path_string if ',' in path_part: return await self._decode_path(path_part) @@ -1766,11 +1765,11 @@ class PathCommand(BaseCommand): except Exception as e: self.logger.error(f"Error extracting path from current message: {e}") return self.translate('commands.path.error_extracting', error=str(e)) - + def get_help(self) -> str: """Get help text for the path command""" return self.translate('commands.path.help') - + def get_help_text(self) -> str: """Get help text for the path command (used by help system)""" return self.get_help() diff --git a/tests/test_path_geo_toggle.py b/tests/test_path_geo_toggle.py new file mode 100644 index 0000000..81ec8c0 --- /dev/null +++ b/tests/test_path_geo_toggle.py @@ -0,0 +1,78 @@ +"""Tests for PathCommand geographic_scoring_enabled config toggle.""" + +import configparser +from pathlib import Path +from unittest.mock import Mock + +import pytest + +from modules.commands.path_command import PathCommand + + +@pytest.fixture +def bot_with_location(): + """Bot mock with a valid Bot section and lat/lon configured.""" + config = configparser.ConfigParser() + config.add_section("Bot") + config.set("Bot", "bot_latitude", "37.7749") + config.set("Bot", "bot_longitude", "-122.4194") + config.add_section("Path_Command") + config.add_section("Logging") + + bot = Mock() + bot.logger = Mock() + bot.config = config + bot.db_manager = Mock() + bot.bot_root = Path("/tmp") + bot._local_root = None + bot.prefix_hex_chars = 2 + bot.key_prefix = lambda pk: (pk or "")[:2] + bot.repeater_manager = Mock() + bot.repeater_manager.get_repeater_devices = Mock(return_value=[]) + bot.web_viewer_integration = None + bot.mesh_graph = None + return bot + + +class TestPathGeoScoringToggle: + """geographic_scoring_enabled config option gates self.geographic_guessing_enabled.""" + + def test_geo_scoring_enabled_by_default_with_valid_coords(self, bot_with_location): + """When geographic_scoring_enabled is unset (defaults True) and coords valid → enabled.""" + cmd = PathCommand(bot_with_location) + assert cmd.geographic_guessing_enabled is True + assert cmd.geographic_scoring_config_enabled is True + + def test_geo_scoring_disabled_via_config(self, bot_with_location): + """When geographic_scoring_enabled = false, guessing stays disabled even with valid coords.""" + bot_with_location.config.set("Path_Command", "geographic_scoring_enabled", "false") + cmd = PathCommand(bot_with_location) + assert cmd.geographic_scoring_config_enabled is False + assert cmd.geographic_guessing_enabled is False + + def test_geo_scoring_enabled_explicit_true(self, bot_with_location): + """Explicit geographic_scoring_enabled = true behaves the same as default.""" + bot_with_location.config.set("Path_Command", "geographic_scoring_enabled", "true") + cmd = PathCommand(bot_with_location) + assert cmd.geographic_scoring_config_enabled is True + assert cmd.geographic_guessing_enabled is True + + def test_geo_guessing_disabled_without_coords_regardless_of_toggle(self, bot_with_location): + """Without configured coordinates, guessing is disabled even when toggle is True.""" + bot_with_location.config.remove_option("Bot", "bot_latitude") + bot_with_location.config.remove_option("Bot", "bot_longitude") + cmd = PathCommand(bot_with_location) + assert cmd.geographic_guessing_enabled is False + + def test_coords_stored_when_scoring_enabled(self, bot_with_location): + """bot_latitude/bot_longitude are stored when geographic scoring is on.""" + cmd = PathCommand(bot_with_location) + assert cmd.bot_latitude == pytest.approx(37.7749) + assert cmd.bot_longitude == pytest.approx(-122.4194) + + def test_coords_stored_but_guessing_off_when_toggle_disabled(self, bot_with_location): + """Coordinates are stored even when toggle disables guessing (for potential future use).""" + bot_with_location.config.set("Path_Command", "geographic_scoring_enabled", "false") + cmd = PathCommand(bot_with_location) + assert cmd.bot_latitude == pytest.approx(37.7749) + assert cmd.geographic_guessing_enabled is False