#!/usr/bin/env python3 """ Repeater Contact Management System Manages a database of repeater contacts and provides purging functionality """ import asyncio import json import time from datetime import datetime, timedelta from typing import Any, Optional from meshcore import EventType from .utils import rate_limited_nominatim_reverse_sync class RepeaterManager: """Manages repeater contacts database and purging operations""" def __init__(self, bot): self.bot = bot self.logger = bot.logger self.db_path = bot.db_manager.db_path # Use the shared database manager self.db_manager = bot.db_manager # Initialize repeater-specific tables self._init_repeater_tables() # Initialize auto-purge monitoring self.contact_limit = 300 # MeshCore device limit (will be updated from device info) self.auto_purge_threshold = 280 # Start purging when 280+ contacts # Respect auto_manage_contacts: manual mode (false) = no auto-purge; device/bot = auto-purge on auto_manage = bot.config.get('Bot', 'auto_manage_contacts', fallback='false').lower() self.auto_purge_enabled = (auto_manage != 'false') # Initialize companion purge settings self.companion_purge_enabled = bot.config.getboolean('Companion_Purge', 'companion_purge_enabled', fallback=False) self.companion_dm_threshold_days = max(0, bot.config.getint('Companion_Purge', 'companion_dm_threshold_days', fallback=30)) self.companion_advert_threshold_days = max(0, bot.config.getint('Companion_Purge', 'companion_advert_threshold_days', fallback=30)) self.companion_min_inactive_days = max(0, bot.config.getint('Companion_Purge', 'companion_min_inactive_days', fallback=30)) # Geocoding cache: packet_hash -> timestamp (to prevent duplicate geocoding within 1 minute) self.geocoding_cache = {} self.geocoding_cache_window = 60 # 1 minute window # Prevent overlapping auto-purge runs and duplicate per-key purge attempts. self._auto_purge_lock = asyncio.Lock() self._auto_purge_in_progress = False self._purge_inflight_keys = set() def _start_purge_attempt(self, public_key: str, contact_type: str) -> bool: """Mark a purge as in-flight; return False when another attempt is already active.""" if public_key in self._purge_inflight_keys: self.logger.info( f"Skipping duplicate {contact_type} purge for {public_key[:16]}... (already in progress)" ) return False self._purge_inflight_keys.add(public_key) return True def _finish_purge_attempt(self, public_key: str): """Clear in-flight marker for a purge attempt.""" self._purge_inflight_keys.discard(public_key) def _init_repeater_tables(self): """Ensure repeater-specific tables exist (created by migrations).""" try: with self.db_manager.connection() as conn: required_tables = [ "repeater_contacts", "complete_contact_tracking", "daily_stats", "unique_advert_packets", "purging_log", "mesh_connections", "observed_paths", ] missing = [] for t in required_tables: cur = conn.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name=?", (t,), ) if cur.fetchone() is None: missing.append(t) if missing: msg = ( "Missing repeater/graph database tables: " + ", ".join(missing) + ". Run the bot once to apply migrations." ) self.logger.error(msg) raise RuntimeError(msg) self.logger.info("Repeater contacts database initialized successfully") except Exception as e: self.logger.error(f"Failed to initialize repeater database: {e}") raise async def track_contact_advertisement(self, advert_data: dict, signal_info: Optional[dict] = None, packet_hash: Optional[str] = None) -> bool: """Track any contact advertisement in the complete tracking database""" try: # Extract basic information public_key = advert_data.get('public_key', '') name = advert_data.get('name', advert_data.get('adv_name', 'Unknown')) device_type = advert_data.get('type', 'Unknown') if not public_key: self.logger.warning("No public key in advertisement data") return False # Determine role and device type role = self._determine_contact_role(advert_data) device_type_str = self._determine_device_type(device_type, name, advert_data) # Extract signal information signal_strength = None snr = None hop_count = None if signal_info: hop_count = signal_info.get('hops', signal_info.get('hop_count')) # Only save RSSI/SNR for zero-hop (direct) connections # For multi-hop packets, signal strength represents the last hop, not the source if hop_count == 0: signal_strength = signal_info.get('rssi', signal_info.get('signal_strength')) snr = signal_info.get('snr') self.logger.debug(f"📡 Saving signal data for direct connection: RSSI={signal_strength}, SNR={snr}") else: self.logger.debug(f"📡 Skipping signal data for {hop_count}-hop connection (not direct)") # Extract path information from advert_data out_path = advert_data.get('out_path', '') out_path_len = advert_data.get('out_path_len', -1) out_bytes_per_hop = advert_data.get('out_bytes_per_hop') # Wrap all DB operations in a single transaction for atomicity with self.db_manager.connection() as conn: # Check if this packet_hash was already processed for this contact # This prevents duplicate writes of the same advert packet if packet_hash and packet_hash != "0000000000000000": existing_packet = self.db_manager.execute_query_on_connection( conn, 'SELECT id FROM unique_advert_packets WHERE public_key = ? AND packet_hash = ?', (public_key, packet_hash) ) if existing_packet: # This packet_hash was already processed - skip contact update self.logger.debug(f"Skipping duplicate advert packet for {name}: {packet_hash[:8]}... (already processed)") return True # Return True since packet was already tracked (not an error) # Check if this contact is already in our complete tracking existing = self.db_manager.execute_query_on_connection( conn, 'SELECT id, advert_count, last_heard, latitude, longitude, city, state, country, out_path, out_path_len, out_bytes_per_hop FROM complete_contact_tracking WHERE public_key = ?', (public_key,) ) current_time = datetime.now() # Extract location data first (without geocoding) self.logger.debug(f"🔍 Extracting location data for {name}...") location_info = self._extract_location_data(advert_data, should_geocode=False) self.logger.debug(f"📍 Location data extracted: {location_info}") # Check if we need to perform geocoding based on location changes existing_data = existing[0] if existing else None should_geocode, location_info = self._should_geocode_location(location_info, existing_data, name, packet_hash) # Re-extract location data with geocoding if needed if should_geocode: self.logger.debug(f"📍 Re-extracting location data with geocoding for {name}") location_info = self._extract_location_data(advert_data, should_geocode=True, packet_hash=packet_hash) self.logger.debug(f"📍 Location data with geocoding: {location_info}") # Update geocoding cache if we have a valid packet_hash (skip invalid/default hashes) if packet_hash and packet_hash != "0000000000000000" and location_info.get('latitude') and location_info.get('longitude'): self.geocoding_cache[packet_hash] = time.time() self.logger.debug(f"📍 Cached geocoding for packet_hash {packet_hash[:16]}...") if existing: # Update existing entry advert_count = existing[0]['advert_count'] + 1 existing_out_path = existing[0].get('out_path') existing_out_path_len = existing[0].get('out_path_len') # Only update out_path and out_path_len if they are NULL/empty (first-seen path) # This preserves the first (shortest) path and doesn't overwrite it final_out_path = out_path if (not existing_out_path or existing_out_path == '') else existing_out_path final_out_path_len = out_path_len if (existing_out_path_len is None or existing_out_path_len == -1) else existing_out_path_len final_out_bytes_per_hop = out_bytes_per_hop if (out_bytes_per_hop is not None) else existing[0].get('out_bytes_per_hop') self.db_manager.execute_update_on_connection(conn, ''' UPDATE complete_contact_tracking SET name = ?, last_heard = ?, advert_count = ?, role = ?, device_type = ?, latitude = ?, longitude = ?, city = ?, state = ?, country = ?, raw_advert_data = ?, signal_strength = ?, snr = ?, hop_count = ?, last_advert_timestamp = ?, out_path = ?, out_path_len = ?, out_bytes_per_hop = ? WHERE public_key = ? ''', ( name, current_time, advert_count, role, device_type_str, location_info['latitude'], location_info['longitude'], location_info['city'], location_info['state'], location_info['country'], json.dumps(advert_data), signal_strength, snr, hop_count, current_time, final_out_path, final_out_path_len, final_out_bytes_per_hop, public_key )) self.logger.debug(f"Updated contact tracking: {name} ({role}) - count: {advert_count}") else: # Insert new entry self.db_manager.execute_update_on_connection(conn, ''' INSERT INTO complete_contact_tracking (public_key, name, role, device_type, first_heard, last_heard, advert_count, latitude, longitude, city, state, country, raw_advert_data, signal_strength, snr, hop_count, last_advert_timestamp, out_path, out_path_len, out_bytes_per_hop) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( public_key, name, role, device_type_str, current_time, current_time, 1, location_info['latitude'], location_info['longitude'], location_info['city'], location_info['state'], location_info['country'], json.dumps(advert_data), signal_strength, snr, hop_count, current_time, out_path, out_path_len, out_bytes_per_hop )) self.logger.info(f"Added new contact to complete tracking: {name} ({role})") # Update the currently_tracked flag based on device contact list self._update_currently_tracked_status_on_conn(conn, public_key) # Track daily advertisement statistics (with packet_hash for unique tracking) self._track_daily_advertisement_on_conn(conn, public_key, name, role, device_type_str, location_info, signal_strength, snr, hop_count, current_time, packet_hash=packet_hash) conn.commit() return True except Exception as e: self.logger.error(f"Error tracking contact advertisement: {e}") return False def _track_daily_advertisement_on_conn(self, conn, public_key: str, name: str, role: str, device_type: str, location_info: dict, signal_strength: Optional[float], snr: Optional[float], hop_count: Optional[int], timestamp: datetime, packet_hash: Optional[str] = None): """Track daily advertisement statistics on an existing connection (no commit). Same logic as _track_daily_advertisement but uses caller's connection for transactional grouping. """ from datetime import date today = date.today() is_unique_packet = False if packet_hash and packet_hash != "0000000000000000": existing_packet = self.db_manager.execute_query_on_connection( conn, 'SELECT id FROM unique_advert_packets WHERE date = ? AND public_key = ? AND packet_hash = ?', (today, public_key, packet_hash) ) if not existing_packet: self.db_manager.execute_update_on_connection(conn, ''' INSERT INTO unique_advert_packets (date, public_key, packet_hash, first_seen) VALUES (?, ?, ?, ?) ''', (today, public_key, packet_hash, timestamp)) is_unique_packet = True self.logger.debug(f"New unique advert packet for {name}: {packet_hash[:8]}...") else: self.logger.debug(f"Duplicate advert packet for {name}: {packet_hash[:8]}... (already counted)") else: is_unique_packet = True if is_unique_packet: existing_daily = self.db_manager.execute_query_on_connection( conn, 'SELECT id, advert_count, first_advert_time FROM daily_stats WHERE date = ? AND public_key = ?', (today, public_key) ) if existing_daily: unique_count = self.db_manager.execute_query_on_connection( conn, 'SELECT COUNT(*) FROM unique_advert_packets WHERE date = ? AND public_key = ?', (today, public_key) ) daily_advert_count = unique_count[0]['COUNT(*)'] if unique_count else existing_daily[0]['advert_count'] + 1 self.db_manager.execute_update_on_connection(conn, ''' UPDATE daily_stats SET advert_count = ?, last_advert_time = ? WHERE date = ? AND public_key = ? ''', (daily_advert_count, timestamp, today, public_key)) self.logger.debug(f"Updated daily stats for {name}: {daily_advert_count} unique adverts today") else: self.db_manager.execute_update_on_connection(conn, ''' INSERT INTO daily_stats (date, public_key, advert_count, first_advert_time, last_advert_time) VALUES (?, ?, ?, ?, ?) ''', (today, public_key, 1, timestamp, timestamp)) self.logger.debug(f"Added daily stats for {name}: first unique advert today") async def _track_daily_advertisement(self, public_key: str, name: str, role: str, device_type: str, location_info: dict, signal_strength: Optional[float], snr: Optional[float], hop_count: Optional[int], timestamp: datetime, packet_hash: Optional[str] = None): """Track daily advertisement statistics for accurate time-based reporting. Args: public_key: The public key of the node name: The name of the node role: The role of the node device_type: The device type string location_info: Location information dictionary signal_strength: Signal strength (RSSI) snr: Signal-to-noise ratio hop_count: Number of hops timestamp: Timestamp of the advert packet_hash: Optional packet hash for unique packet tracking """ try: from datetime import date # Get today's date today = date.today() # Track unique packet hash if provided (for deduplication) is_unique_packet = False if packet_hash and packet_hash != "0000000000000000": try: # Check if we've already seen this packet hash today existing_packet = self.db_manager.execute_query( 'SELECT id FROM unique_advert_packets WHERE date = ? AND public_key = ? AND packet_hash = ?', (today, public_key, packet_hash) ) if not existing_packet: # This is a new unique packet - insert it self.db_manager.execute_update(''' INSERT INTO unique_advert_packets (date, public_key, packet_hash, first_seen) VALUES (?, ?, ?, ?) ''', (today, public_key, packet_hash, timestamp)) is_unique_packet = True self.logger.debug(f"New unique advert packet for {name}: {packet_hash[:8]}...") else: # We've already seen this packet hash today - don't count it again self.logger.debug(f"Duplicate advert packet for {name}: {packet_hash[:8]}... (already counted)") except Exception as e: self.logger.debug(f"Error tracking unique packet hash: {e}") # Fall through to count it anyway if unique tracking fails is_unique_packet = True else: # No packet hash provided, count it as unique (can't deduplicate) is_unique_packet = True # Only increment count if this is a unique packet if is_unique_packet: # Check if we already have an entry for this contact today existing_daily = self.db_manager.execute_query( 'SELECT id, advert_count, first_advert_time FROM daily_stats WHERE date = ? AND public_key = ?', (today, public_key) ) if existing_daily: # Update existing daily entry - count unique packets only # Count distinct packet hashes for today from unique_advert_packets table unique_count = self.db_manager.execute_query( 'SELECT COUNT(*) FROM unique_advert_packets WHERE date = ? AND public_key = ?', (today, public_key) ) daily_advert_count = unique_count[0]['COUNT(*)'] if unique_count else existing_daily[0]['advert_count'] + 1 self.db_manager.execute_update(''' UPDATE daily_stats SET advert_count = ?, last_advert_time = ? WHERE date = ? AND public_key = ? ''', (daily_advert_count, timestamp, today, public_key)) self.logger.debug(f"Updated daily stats for {name}: {daily_advert_count} unique adverts today") else: # Insert new daily entry self.db_manager.execute_update(''' INSERT INTO daily_stats (date, public_key, advert_count, first_advert_time, last_advert_time) VALUES (?, ?, ?, ?, ?) ''', (today, public_key, 1, timestamp, timestamp)) self.logger.debug(f"Added daily stats for {name}: first unique advert today") except Exception as e: self.logger.error(f"Error tracking daily advertisement: {e}") def _determine_contact_role(self, contact_data: dict) -> str: """Determine the role of a contact based on MeshCore specifications""" from .enums import DeviceRole # First priority: Use the mode field from parsed advertisement data mode = contact_data.get('mode', '') if mode: # Convert DeviceRole enum values to lowercase role strings if mode == DeviceRole.Repeater.value: return 'repeater' elif mode == DeviceRole.RoomServer.value: return 'roomserver' elif mode == DeviceRole.Companion.value: return 'companion' elif mode == 'Sensor': return 'sensor' else: # Handle any other mode values return mode.lower() # Fallback to legacy detection methods name = contact_data.get('name', contact_data.get('adv_name', '')).lower() device_type = contact_data.get('type', 0) # Check device type (legacy indicator) if device_type == 2: return 'repeater' elif device_type == 3: return 'roomserver' # Check name-based indicators for role detection (legacy fallback) if any(keyword in name for keyword in ['repeater', 'rpt', 'rp']): return 'repeater' elif any(keyword in name for keyword in ['room', 'server', 'rs', 'roomserver']): return 'roomserver' elif any(keyword in name for keyword in ['sensor', 'sens']): return 'sensor' elif any(keyword in name for keyword in ['bot', 'automated', 'automation']): return 'bot' elif any(keyword in name for keyword in ['gateway', 'gw', 'bridge']): return 'gateway' else: # Default to companion for unknown contacts (human users) return 'companion' def _determine_device_type(self, device_type: int, name: str, advert_data: Optional[dict] = None) -> str: """Determine device type string from numeric type and name following MeshCore specs""" from .enums import DeviceRole # First priority: Use the mode field from parsed advertisement data if advert_data and advert_data.get('mode'): mode = advert_data.get('mode') if mode == DeviceRole.Repeater.value: return 'Repeater' elif mode == DeviceRole.RoomServer.value: return 'RoomServer' elif mode == DeviceRole.Companion.value: return 'Companion' elif mode == 'Sensor': return 'Sensor' else: # Handle any other mode values return str(mode) # Fallback to legacy detection methods if device_type == 3: return 'RoomServer' elif device_type == 2: return 'Repeater' elif device_type == 1: return 'Companion' else: # Fallback to name-based detection name_lower = name.lower() if 'room' in name_lower or 'server' in name_lower or 'roomserver' in name_lower: return 'RoomServer' elif 'repeater' in name_lower or 'rpt' in name_lower: return 'Repeater' elif 'sensor' in name_lower or 'sens' in name_lower: return 'Sensor' elif 'gateway' in name_lower or 'gw' in name_lower or 'bridge' in name_lower: return 'Gateway' elif 'bot' in name_lower or 'automated' in name_lower: return 'Bot' else: return 'Companion' # Default to companion for human users def _update_currently_tracked_status_on_conn(self, conn, public_key: str): """Update the is_currently_tracked flag on an existing connection (no commit).""" is_tracked = False if hasattr(self.bot.meshcore, 'contacts'): for contact_key, contact_data in list(self.bot.meshcore.contacts.items()): if contact_data.get('public_key', contact_key) == public_key: is_tracked = True break self.db_manager.execute_update_on_connection( conn, 'UPDATE complete_contact_tracking SET is_currently_tracked = ? WHERE public_key = ?', (is_tracked, public_key) ) async def _update_currently_tracked_status(self, public_key: str): """Update the is_currently_tracked flag based on device contact list""" try: # Check if this repeater is currently in the device's contact list is_tracked = False if hasattr(self.bot.meshcore, 'contacts'): for contact_key, contact_data in list(self.bot.meshcore.contacts.items()): if contact_data.get('public_key', contact_key) == public_key: is_tracked = True break # Update the flag self.db_manager.execute_update( 'UPDATE complete_contact_tracking SET is_currently_tracked = ? WHERE public_key = ?', (is_tracked, public_key) ) except Exception as e: self.logger.error(f"Error updating currently tracked status: {e}") async def get_complete_contact_database(self, role_filter: Optional[str] = None, include_historical: bool = True) -> list[dict]: """Get complete contact database for path estimation and analysis""" try: if include_historical: if role_filter: # Get all contacts of specific role ever heard query = ''' SELECT public_key, name, role, device_type, first_heard, last_heard, advert_count, latitude, longitude, city, state, country, signal_strength, hop_count, is_currently_tracked, last_advert_timestamp, is_starred FROM complete_contact_tracking WHERE role = ? ORDER BY last_heard DESC ''' results = self.db_manager.execute_query(query, (role_filter,)) else: # Get all contacts ever heard query = ''' SELECT public_key, name, role, device_type, first_heard, last_heard, advert_count, latitude, longitude, city, state, country, signal_strength, hop_count, is_currently_tracked, last_advert_timestamp, is_starred FROM complete_contact_tracking ORDER BY last_heard DESC ''' results = self.db_manager.execute_query(query) else: if role_filter: # Get only currently tracked contacts of specific role query = ''' SELECT public_key, name, role, device_type, first_heard, last_heard, advert_count, latitude, longitude, city, state, country, signal_strength, hop_count, is_currently_tracked, last_advert_timestamp FROM complete_contact_tracking WHERE role = ? AND is_currently_tracked = 1 ORDER BY last_heard DESC ''' results = self.db_manager.execute_query(query, (role_filter,)) else: # Get only currently tracked contacts query = ''' SELECT public_key, name, role, device_type, first_heard, last_heard, advert_count, latitude, longitude, city, state, country, signal_strength, hop_count, is_currently_tracked, last_advert_timestamp FROM complete_contact_tracking WHERE is_currently_tracked = 1 ORDER BY last_heard DESC ''' results = self.db_manager.execute_query(query) return results except Exception as e: self.logger.error(f"Error getting complete repeater database: {e}") return [] async def get_contact_statistics(self) -> dict: """Get statistics about the contact tracking database""" try: stats = {} # Total contacts ever heard total_result = self.db_manager.execute_query( 'SELECT COUNT(*) as count FROM complete_contact_tracking' ) stats['total_heard'] = total_result[0]['count'] if total_result else 0 # Currently tracked contacts current_result = self.db_manager.execute_query( 'SELECT COUNT(*) as count FROM complete_contact_tracking WHERE is_currently_tracked = 1' ) stats['currently_tracked'] = current_result[0]['count'] if current_result else 0 # Recent activity (last 24 hours) recent_result = self.db_manager.execute_query( 'SELECT COUNT(*) as count FROM complete_contact_tracking WHERE last_heard > datetime("now", "-1 day")' ) stats['recent_activity'] = recent_result[0]['count'] if recent_result else 0 # Role breakdown role_result = self.db_manager.execute_query( 'SELECT role, COUNT(*) as count FROM complete_contact_tracking GROUP BY role' ) stats['by_role'] = {row['role']: row['count'] for row in role_result} # Device type breakdown type_result = self.db_manager.execute_query( 'SELECT device_type, COUNT(*) as count FROM complete_contact_tracking GROUP BY device_type' ) stats['by_type'] = {row['device_type']: row['count'] for row in type_result} return stats except Exception as e: self.logger.error(f"Error getting contact statistics: {e}") return {} async def get_contacts_by_role(self, role: str, include_historical: bool = True) -> list[dict]: """Get contacts filtered by specific MeshCore role (repeater, roomserver, companion, sensor, gateway, bot)""" return await self.get_complete_contact_database(role_filter=role, include_historical=include_historical) async def get_repeater_devices(self, include_historical: bool = True) -> list[dict]: """Get all repeater devices (repeaters and roomservers) following MeshCore terminology""" repeater_db = await self.get_complete_contact_database(role_filter='repeater', include_historical=include_historical) roomserver_db = await self.get_complete_contact_database(role_filter='roomserver', include_historical=include_historical) return repeater_db + roomserver_db async def get_companion_contacts(self, include_historical: bool = True) -> list[dict]: """Get all companion contacts (human users) following MeshCore terminology""" return await self.get_complete_contact_database(role_filter='companion', include_historical=include_historical) async def get_sensor_devices(self, include_historical: bool = True) -> list[dict]: """Get all sensor devices following MeshCore terminology""" return await self.get_complete_contact_database(role_filter='sensor', include_historical=include_historical) async def get_gateway_devices(self, include_historical: bool = True) -> list[dict]: """Get all gateway devices following MeshCore terminology""" return await self.get_complete_contact_database(role_filter='gateway', include_historical=include_historical) async def get_bot_devices(self, include_historical: bool = True) -> list[dict]: """Get all bot/automated devices following MeshCore terminology""" return await self.get_complete_contact_database(role_filter='bot', include_historical=include_historical) async def check_and_auto_purge(self) -> bool: """Check contact limit and auto-purge repeaters and companions if needed""" # Avoid overlapping auto-purge runs from concurrent callers. if self._auto_purge_in_progress: self.logger.debug("Auto-purge already running, skipping overlapping check") return False self._auto_purge_in_progress = True try: async with self._auto_purge_lock: if not self.auto_purge_enabled: return False # Get current contact count current_count = len(self.bot.meshcore.contacts) if current_count >= self.auto_purge_threshold: self.logger.info(f"🔄 Auto-purge triggered: {current_count}/{self.contact_limit} contacts (threshold: {self.auto_purge_threshold})") # Calculate how many to purge target_count = self.auto_purge_threshold - 20 # Leave some buffer purge_count = current_count - target_count if purge_count > 0: # First try to purge repeaters repeater_success = await self._auto_purge_repeaters(purge_count) remaining_count = len(self.bot.meshcore.contacts) # If still above threshold and companion purging is enabled, purge companions if remaining_count >= self.auto_purge_threshold and self.companion_purge_enabled: remaining_purge_count = remaining_count - target_count self.logger.info(f"Still above threshold after repeater purge, purging {remaining_purge_count} companions...") companion_success = await self._auto_purge_companions(remaining_purge_count) if repeater_success or companion_success: final_count = len(self.bot.meshcore.contacts) self.logger.info(f"✅ Auto-purge completed, now at {final_count}/{self.contact_limit} contacts") return True elif repeater_success: self.logger.info(f"✅ Auto-purged {purge_count} repeaters, now at {remaining_count}/{self.contact_limit} contacts") return True else: self.logger.warning(f"❌ Auto-purge failed to remove {purge_count} contacts") return False return False except Exception as e: self.logger.error(f"Error in auto-purge check: {e}") return False finally: self._auto_purge_in_progress = False async def _auto_purge_repeaters(self, count: int) -> bool: """Automatically purge repeaters using intelligent selection""" try: # Get all repeaters sorted by priority (least important first) repeaters_to_purge = await self._get_repeaters_for_purging(count) if not repeaters_to_purge: self.logger.warning("No repeaters available for auto-purge") # Log some debugging info total_contacts = len(self.bot.meshcore.contacts) repeater_count = sum(1 for contact_data in list(self.bot.meshcore.contacts.values()) if self._is_repeater_device(contact_data)) self.logger.debug(f"Debug: {total_contacts} total contacts, {repeater_count} repeaters found") return False purged_count = 0 for repeater in repeaters_to_purge: try: # Use the improved purge method public_key = repeater['public_key'] success = await self.purge_repeater_from_contacts(public_key, "Auto-purge - contact limit management") if success: purged_count += 1 self.logger.info(f"🗑️ Auto-purged repeater: {repeater['name']} (last seen: {repeater['last_seen']})") else: self.logger.warning(f"Failed to auto-purge repeater: {repeater['name']}") except Exception as e: self.logger.error(f"Error auto-purging repeater {repeater['name']}: {e}") continue self.logger.info(f"✅ Auto-purge completed: {purged_count}/{count} repeaters removed") return purged_count > 0 except Exception as e: self.logger.error(f"Error in auto-purge execution: {e}") return False async def _auto_purge_companions(self, count: int) -> bool: """Automatically purge companion contacts using intelligent selection""" try: if not self.companion_purge_enabled: self.logger.debug("Companion purging is disabled") return False # Get all companions sorted by priority (most inactive first) companions_to_purge = await self._get_companions_for_purging(count) if not companions_to_purge: self.logger.warning("No companions available for auto-purge") # Log some debugging info total_contacts = len(self.bot.meshcore.contacts) companion_count = sum(1 for contact_data in list(self.bot.meshcore.contacts.values()) if self._is_companion_device(contact_data)) self.logger.debug(f"Debug: {total_contacts} total contacts, {companion_count} companions found") return False purged_count = 0 for i, companion in enumerate(companions_to_purge): try: public_key = companion['public_key'] # Get activity info (already formatted as 'never' if no activity) last_dm = companion.get('last_dm', 'never') last_advert = companion.get('last_advert', 'never') days_inactive = companion.get('days_inactive', 'unknown') success = await self.purge_companion_from_contacts(public_key, "Auto-purge - contact limit management") if success: purged_count += 1 self.logger.info(f"🗑️ Auto-purged companion: {companion['name']} (DM: {last_dm}, Advert: {last_advert}, Inactive: {days_inactive}d)") else: self.logger.warning(f"Failed to auto-purge companion: {companion['name']}") # Add delay between removals to avoid overwhelming the radio # Use longer delay (2 seconds) to give radio time to process if i < len(companions_to_purge) - 1: await asyncio.sleep(2) except Exception as e: self.logger.error(f"Error auto-purging companion {companion['name']}: {e}") # Still add delay even on error if i < len(companions_to_purge) - 1: await asyncio.sleep(2) continue self.logger.info(f"✅ Auto-purge completed: {purged_count}/{count} companions removed") return purged_count > 0 except Exception as e: self.logger.error(f"Error in companion auto-purge execution: {e}") return False async def _get_repeaters_for_purging(self, count: int) -> list[dict]: """Get list of repeaters to purge based on intelligent criteria from device contacts""" try: # Get repeaters directly from device contacts, not database device_repeaters = [] for contact_key, contact_data in list(self.bot.meshcore.contacts.items()): # Check if this is a repeater device if self._is_repeater_device(contact_data): public_key = contact_data.get('public_key', contact_key) name = contact_data.get('adv_name', contact_data.get('name', 'Unknown')) device_type = 'Repeater' if contact_data.get('type') == 3: device_type = 'RoomServer' # Get last seen timestamp last_seen = contact_data.get('last_seen', contact_data.get('last_advert', contact_data.get('timestamp'))) if last_seen: try: if isinstance(last_seen, str): last_seen_dt = datetime.fromisoformat(last_seen.replace('Z', '+00:00')) elif isinstance(last_seen, (int, float)): last_seen_dt = datetime.fromtimestamp(last_seen) else: last_seen_dt = last_seen except: last_seen_dt = datetime.now() - timedelta(days=30) # Default to old else: last_seen_dt = datetime.now() - timedelta(days=30) # Default to old device_repeaters.append({ 'public_key': public_key, 'name': name, 'device_type': device_type, 'last_seen': last_seen_dt.strftime('%Y-%m-%d %H:%M:%S'), 'latitude': contact_data.get('adv_lat'), 'longitude': contact_data.get('adv_lon'), 'city': contact_data.get('city'), 'state': contact_data.get('state'), 'country': contact_data.get('country') }) # Sort by priority (oldest first, with location data as secondary factor) device_repeaters.sort(key=lambda x: ( # Priority 1: Very old (7+ days) 1 if (datetime.now() - datetime.strptime(x['last_seen'], '%Y-%m-%d %H:%M:%S')).days >= 7 else # Priority 2: Medium old (3-7 days) 2 if (datetime.now() - datetime.strptime(x['last_seen'], '%Y-%m-%d %H:%M:%S')).days >= 3 else # Priority 3: Recent (0-3 days) 3, # Within same priority, prefer repeaters without location data, then oldest first 0 if not (x.get('latitude') and x.get('longitude')) else 1, x['last_seen'] )) # Apply additional filtering criteria filtered_repeaters = [] for repeater in device_repeaters: # Skip repeaters with very recent activity (last 2 hours) - more lenient last_seen_dt = datetime.strptime(repeater['last_seen'], '%Y-%m-%d %H:%M:%S') if last_seen_dt > datetime.now() - timedelta(hours=2): continue # Don't skip repeaters with location data - location data is common and not a reason to preserve # The sorting logic above already prioritizes repeaters without location data filtered_repeaters.append(repeater) if len(filtered_repeaters) >= count: break self.logger.debug(f"Found {len(device_repeaters)} device repeaters, {len(filtered_repeaters)} available for purging") # Additional debugging info if len(filtered_repeaters) == 0 and len(device_repeaters) > 0: self.logger.debug("No repeaters available for purging - checking filtering criteria:") recent_count = 0 location_count = 0 for repeater in device_repeaters: last_seen_dt = datetime.strptime(repeater['last_seen'], '%Y-%m-%d %H:%M:%S') if last_seen_dt > datetime.now() - timedelta(hours=2): recent_count += 1 if repeater['latitude'] and repeater['longitude']: location_count += 1 self.logger.debug(f"Filtering stats: {recent_count} too recent, {location_count} with location data") return filtered_repeaters[:count] except Exception as e: self.logger.error(f"Error getting repeaters for purging: {e}") return [] async def _get_companions_for_purging(self, count: int) -> list[dict]: """Get list of companion contacts to purge based on activity scoring""" try: if not self.companion_purge_enabled: self.logger.debug("Companion purging is disabled") return [] current_time = datetime.now() scored_companions = [] # Get activity data from database for all companions for contact_key, contact_data in list(self.bot.meshcore.contacts.items()): # Check if this is a companion device if not self._is_companion_device(contact_data): continue public_key = contact_data.get('public_key', contact_key) name = contact_data.get('adv_name', contact_data.get('name', 'Unknown')) # Skip if in ACL (never purge ACL members) if self._is_in_acl(public_key): self.logger.debug(f"Skipping companion {name} - in ACL") continue # Get activity data from database last_dm = self._get_last_dm_activity(public_key) last_advert = self._get_last_advert_activity(public_key) # Get tracking data from complete_contact_tracking table # Try with role filter first, then without if no results tracking_query = ''' SELECT last_heard, last_advert_timestamp, advert_count, first_heard FROM complete_contact_tracking WHERE public_key = ? ORDER BY CASE WHEN role = 'companion' THEN 0 ELSE 1 END LIMIT 1 ''' tracking_data = self.db_manager.execute_query(tracking_query, (public_key,)) # Get DM count from message_stats dm_count = 0 if name: dm_query = ''' SELECT COUNT(*) as dm_count FROM message_stats WHERE sender_id = ? AND is_dm = 1 ''' dm_results = self.db_manager.execute_query(dm_query, (name,)) if dm_results and dm_results[0].get('dm_count'): dm_count = dm_results[0]['dm_count'] # Determine most recent activity last_activity = None if last_dm and last_advert: last_activity = max(last_dm, last_advert) elif last_dm: last_activity = last_dm elif last_advert: last_activity = last_advert elif tracking_data and tracking_data[0].get('last_heard'): # Fallback to last_heard from tracking try: last_heard = tracking_data[0]['last_heard'] if isinstance(last_heard, str): last_activity = datetime.fromisoformat(last_heard.replace('Z', '+00:00')) elif isinstance(last_heard, (int, float)): last_activity = datetime.fromtimestamp(last_heard) elif isinstance(last_heard, datetime): last_activity = last_heard except: pass # Skip very recently active (last 2 hours) - protect active users if last_activity and last_activity > current_time - timedelta(hours=2): continue # Calculate days since last activity days_inactive = None if last_activity: days_inactive = (current_time - last_activity).days else: # No activity found - use last_seen from device or default to very old last_seen = contact_data.get('last_seen', contact_data.get('last_advert', contact_data.get('timestamp'))) if last_seen: try: if isinstance(last_seen, str): last_seen_dt = datetime.fromisoformat(last_seen.replace('Z', '+00:00')) elif isinstance(last_seen, (int, float)): last_seen_dt = datetime.fromtimestamp(last_seen) else: last_seen_dt = last_seen days_inactive = (current_time - last_seen_dt).days except: days_inactive = 999 # Very old if we can't parse else: days_inactive = 999 # Very old if no data # Get activity counts advert_count = 0 if tracking_data and tracking_data[0].get('advert_count'): advert_count = tracking_data[0]['advert_count'] total_activity = dm_count + advert_count # Calculate purge score (lower = more eligible for purging) # Score factors: # 1. Days inactive (primary factor - more days = lower score) # 2. Total activity count (lower activity = lower score) # 3. Recency bonus (recent activity = higher score, but we already filtered < 2 hours) # Base score: days inactive (more days = lower score) # Use negative so higher days = lower score base_score = -days_inactive if days_inactive is not None else -999 # Activity bonus: more activity = slightly higher score (less purgeable) # But this is secondary to inactivity activity_bonus = min(total_activity * 0.1, 10) # Cap at 10 points # Final score: lower = more purgeable purge_score = base_score + activity_bonus scored_companions.append({ 'public_key': public_key, 'name': name, 'last_dm': last_dm.isoformat() if last_dm else 'never', 'last_advert': last_advert.isoformat() if last_advert else 'never', 'last_activity': last_activity.isoformat() if last_activity else None, 'days_inactive': days_inactive, 'dm_count': dm_count, 'advert_count': advert_count, 'total_activity': total_activity, 'purge_score': purge_score, 'latitude': contact_data.get('adv_lat'), 'longitude': contact_data.get('adv_lon'), 'city': contact_data.get('city'), 'state': contact_data.get('state'), 'country': contact_data.get('country') }) # Sort by purge score (lowest first = most purgeable first) # Secondary sort: without location data first (less useful contacts) scored_companions.sort(key=lambda x: ( x['purge_score'], # Lower score = more purgeable 0 if not (x.get('latitude') and x.get('longitude')) else 1 # No location = more purgeable )) # Enhanced debugging contacts_snapshot = list(self.bot.meshcore.contacts.items()) total_companions_checked = sum(1 for _, contact_data in contacts_snapshot if self._is_companion_device(contact_data)) acl_skipped = sum(1 for contact_key, contact_data in contacts_snapshot if self._is_companion_device(contact_data) and self._is_in_acl(contact_data.get('public_key', contact_key))) recent_skipped = total_companions_checked - acl_skipped - len(scored_companions) self.logger.debug(f"Companion purge analysis: {total_companions_checked} total companions, " f"{acl_skipped} in ACL (skipped), {recent_skipped} recently active (skipped), " f"{len(scored_companions)} scored and ranked") if scored_companions: # Log top candidates for debugging top_candidates = scored_companions[:min(5, len(scored_companions))] candidate_info = [f"{c['name']} (score={c['purge_score']:.1f}, inactive={c['days_inactive']}d)" for c in top_candidates] self.logger.debug(f"Top purge candidates: {', '.join(candidate_info)}") return scored_companions[:count] except Exception as e: self.logger.error(f"Error getting companions for purging: {e}") return [] def _extract_location_data(self, contact_data: dict, should_geocode: bool = True, packet_hash: Optional[str] = None) -> dict[str, Any]: """Extract location data from contact_data JSON""" location_info: dict[str, Any] = { 'latitude': None, 'longitude': None, 'city': None, 'state': None, 'country': None } try: # First check for direct lat/lon fields (from parsed advert data) if 'lat' in contact_data and 'lon' in contact_data: try: location_info['latitude'] = float(contact_data['lat']) location_info['longitude'] = float(contact_data['lon']) self.logger.debug(f"📍 Direct lat/lon found: {location_info['latitude']}, {location_info['longitude']}") # Don't return here - continue to geocoding logic below except (ValueError, TypeError) as e: self.logger.warning(f"Failed to parse direct lat/lon: {e}") # Check for various possible location field names in contact data location_fields = [ 'location', 'gps', 'coordinates', 'lat_lon', 'lat_lng', 'position', 'geo', 'geolocation', 'loc' ] for field in location_fields: if field in contact_data: loc_data = contact_data[field] if isinstance(loc_data, dict): # Handle structured location data if 'lat' in loc_data and 'lon' in loc_data: try: location_info['latitude'] = float(loc_data['lat']) location_info['longitude'] = float(loc_data['lon']) except (ValueError, TypeError): pass elif 'latitude' in loc_data and 'longitude' in loc_data: try: location_info['latitude'] = float(loc_data['latitude']) location_info['longitude'] = float(loc_data['longitude']) except (ValueError, TypeError): pass # Extract city/state/country if available for addr_field in ['city', 'state', 'country', 'region', 'province']: if addr_field in loc_data and loc_data[addr_field]: if addr_field == 'region' or addr_field == 'province': location_info['state'] = str(loc_data[addr_field]) else: location_info[addr_field] = str(loc_data[addr_field]) elif isinstance(loc_data, str): # Handle string location data (e.g., "lat,lon" or "city, state") if ',' in loc_data: parts = [p.strip() for p in loc_data.split(',')] if len(parts) >= 2: try: # Try to parse as coordinates lat = float(parts[0]) lon = float(parts[1]) location_info['latitude'] = lat location_info['longitude'] = lon except ValueError: # Treat as city, state format location_info['city'] = parts[0] if len(parts) > 1: location_info['state'] = parts[1] if len(parts) > 2: location_info['country'] = parts[2] # Check for individual lat/lon fields (including MeshCore-specific fields) for lat_field in ['adv_lat', 'lat', 'latitude', 'gps_lat']: if lat_field in contact_data: try: location_info['latitude'] = float(contact_data[lat_field]) break except (ValueError, TypeError): pass for lon_field in ['adv_lon', 'lon', 'lng', 'longitude', 'gps_lon', 'gps_lng']: if lon_field in contact_data: try: location_info['longitude'] = float(contact_data[lon_field]) break except (ValueError, TypeError): pass # Check for address fields for city_field in ['city', 'town', 'municipality']: if city_field in contact_data and contact_data[city_field]: location_info['city'] = str(contact_data[city_field]) break for state_field in ['state', 'province', 'region']: if state_field in contact_data and contact_data[state_field]: location_info['state'] = str(contact_data[state_field]) break for country_field in ['country', 'nation']: if country_field in contact_data and contact_data[country_field]: location_info['country'] = str(contact_data[country_field]) break # Validate coordinates if we have them if location_info['latitude'] is not None and location_info['longitude'] is not None: lat, lon = location_info['latitude'], location_info['longitude'] # Treat 0,0 coordinates as "hidden" location (common in MeshCore) if lat == 0.0 and lon == 0.0: location_info['latitude'] = None location_info['longitude'] = None # Check for valid coordinate ranges elif not (-90 <= lat <= 90) or not (-180 <= lon <= 180): # Invalid coordinates location_info['latitude'] = None location_info['longitude'] = None else: # Valid coordinates - try reverse geocoding if we don't have city/state/country and geocoding is enabled if should_geocode and (not location_info['city'] or not location_info['state'] or not location_info['country']): try: # Use reverse geocoding to get city/state/country (pass packet_hash to prevent duplicate API calls) city = self._get_city_from_coordinates(lat, lon, packet_hash=packet_hash) if city: location_info['city'] = city # Get state and country from coordinates (pass packet_hash to prevent duplicate API calls) state, country = self._get_state_country_from_coordinates(lat, lon, packet_hash=packet_hash) if state: location_info['state'] = state if country: location_info['country'] = country except Exception as e: self.logger.debug(f"Reverse geocoding failed: {e}") elif not should_geocode: self.logger.debug(f"📍 Skipping geocoding for coordinates {lat}, {lon} (location unchanged)") except Exception as e: self.logger.debug(f"Error extracting location data: {e}") return location_info def _should_geocode_location(self, location_info: dict, existing_data: Optional[dict] = None, name: str = "Unknown", packet_hash: Optional[str] = None) -> tuple[bool, dict]: """ Determine if geocoding should be performed based on location changes. Args: location_info: New location data extracted from advert existing_data: Existing location data from database (optional) name: Contact name for logging packet_hash: Optional packet hash to prevent duplicate geocoding within time window Returns: tuple: (should_geocode: bool, updated_location_info: Dict) """ should_geocode = False updated_location_info = location_info.copy() # Clean up old cache entries (older than cache window) current_time = time.time() expired_keys = [ key for key, timestamp in self.geocoding_cache.items() if current_time - timestamp > self.geocoding_cache_window ] for key in expired_keys: del self.geocoding_cache[key] # Check packet hash cache first - if we've geocoded this packet recently, skip # Skip caching for invalid/default packet hashes (all zeros) if packet_hash and packet_hash != "0000000000000000" and packet_hash in self.geocoding_cache: cache_age = current_time - self.geocoding_cache[packet_hash] if cache_age < self.geocoding_cache_window: self.logger.info(f"📍 Skipping geocoding for {name} — packet_hash {packet_hash[:16]}... geocoded {cache_age:.1f}s ago (rate-limit window {self.geocoding_cache_window}s)") # Use existing location data if available, otherwise return as-is if existing_data: updated_location_info['city'] = existing_data.get('city') updated_location_info['state'] = existing_data.get('state') updated_location_info['country'] = existing_data.get('country') return False, updated_location_info # If no existing data, only geocode if we have valid coordinates but missing location data if not existing_data: should_geocode = ( location_info['latitude'] is not None and location_info['longitude'] is not None and not (location_info['latitude'] == 0.0 and location_info['longitude'] == 0.0) and not (location_info['state'] and location_info['country'] and location_info['city']) ) if should_geocode: missing_fields = [] if not location_info.get('state'): missing_fields.append("state") if not location_info.get('country'): missing_fields.append("country") if not location_info.get('city'): missing_fields.append("city") self.logger.debug(f"📍 New contact {name}, will geocode coordinates (missing {', '.join(missing_fields)})") return should_geocode, updated_location_info # Extract existing location data existing_lat = existing_data.get('latitude', 0.0) if existing_data.get('latitude') is not None else 0.0 existing_lon = existing_data.get('longitude', 0.0) if existing_data.get('longitude') is not None else 0.0 existing_city = existing_data.get('city') existing_state = existing_data.get('state') existing_country = existing_data.get('country') # Check if we have valid coordinates in the new data if (location_info['latitude'] is not None and location_info['longitude'] is not None and not (location_info['latitude'] == 0.0 and location_info['longitude'] == 0.0)): # Use a more lenient threshold for coordinate changes (0.001 degrees ≈ 111 meters) # This prevents geocoding for minor GPS variations in stationary repeaters coordinates_changed = ( abs(location_info['latitude'] - existing_lat) > 0.001 or abs(location_info['longitude'] - existing_lon) > 0.001 ) # Check if we have sufficient location data (state AND country AND city) # City is important for display, so we should geocode if it's missing has_sufficient_location_data = existing_state and existing_country and existing_city # Only geocode if: # 1. Coordinates changed significantly (repeater moved), OR # 2. We're missing state, country, or city (incomplete location data) should_geocode = coordinates_changed or not has_sufficient_location_data if not should_geocode: # Coordinates haven't changed and we have sufficient location data # Use existing location data, no need to geocode updated_location_info['city'] = existing_city updated_location_info['state'] = existing_state updated_location_info['country'] = existing_country self.logger.debug(f"📍 Using existing location data for {name} (coordinates unchanged, has state/country/city)") elif coordinates_changed: self.logger.debug(f"📍 Location changed significantly for {name} (moved >111m), will geocode new coordinates") else: missing_fields = [] if not existing_state: missing_fields.append("state") if not existing_country: missing_fields.append("country") if not existing_city: missing_fields.append("city") self.logger.debug(f"📍 Missing {', '.join(missing_fields)} for {name}, will geocode coordinates") else: # No valid coordinates in new data, keep existing location updated_location_info['latitude'] = existing_lat if existing_lat != 0.0 else None updated_location_info['longitude'] = existing_lon if existing_lon != 0.0 else None updated_location_info['city'] = existing_city updated_location_info['state'] = existing_state updated_location_info['country'] = existing_country return should_geocode, updated_location_info def _get_existing_geocoded_data(self, latitude: float, longitude: float) -> Optional[dict[str, Optional[str]]]: """Check database for existing geocoded data for the same coordinates""" try: # Use a small tolerance for coordinate matching (0.001 degrees ≈ 111 meters) # This handles minor GPS variations while still matching the same location tolerance = 0.001 result = self.db_manager.execute_query(''' SELECT city, state, country FROM complete_contact_tracking WHERE latitude IS NOT NULL AND longitude IS NOT NULL AND ABS(latitude - ?) < ? AND ABS(longitude - ?) < ? AND (city IS NOT NULL OR state IS NOT NULL OR country IS NOT NULL) LIMIT 1 ''', (latitude, tolerance, longitude, tolerance)) if result and len(result) > 0: row = result[0] # Only return if we have at least some location data if row.get('city') or row.get('state') or row.get('country'): self.logger.debug(f"📍 Found existing geocoded data in database for coordinates {latitude}, {longitude}") return { 'city': row.get('city'), 'state': row.get('state'), 'country': row.get('country') } except Exception as e: self.logger.debug(f"Error checking database for geocoded data: {e}") return None def _get_state_country_from_coordinates(self, latitude: float, longitude: float, packet_hash: Optional[str] = None) -> tuple[Optional[str], Optional[str]]: """Get state and country from coordinates using reverse geocoding""" # Check packet hash cache first to prevent duplicate API calls if packet_hash and packet_hash != "0000000000000000": current_time = time.time() if packet_hash in self.geocoding_cache: cache_age = current_time - self.geocoding_cache[packet_hash] if cache_age < self.geocoding_cache_window: # Check database for state/country data existing_data = self._get_existing_geocoded_data(latitude, longitude) if existing_data: return existing_data.get('state'), existing_data.get('country') # If no data in database, return None (don't make API call) return None, None # Check database first to avoid duplicate API calls existing_data = self._get_existing_geocoded_data(latitude, longitude) if existing_data: return existing_data.get('state'), existing_data.get('country') try: # Use rate-limited Nominatim reverse geocoding location = rate_limited_nominatim_reverse_sync( self.bot, f"{latitude}, {longitude}", timeout=10 ) if location: address = location.raw.get('address', {}) # Get state/province state = (address.get('state') or address.get('province') or address.get('region')) # Get country country = address.get('country') return state, country except Exception as e: self.logger.debug(f"Reverse geocoding for state/country failed: {e}") return None, None def _get_city_from_coordinates(self, latitude: float, longitude: float, packet_hash: Optional[str] = None) -> Optional[str]: """Get city name from coordinates using reverse geocoding, with neighborhood for large cities""" # Check packet hash cache first to prevent duplicate API calls if packet_hash and packet_hash != "0000000000000000": current_time = time.time() if packet_hash in self.geocoding_cache: cache_age = current_time - self.geocoding_cache[packet_hash] if cache_age < self.geocoding_cache_window: # Check database for city data existing_data = self._get_existing_geocoded_data(latitude, longitude) if existing_data and existing_data.get('city'): return existing_data.get('city') # If no city in database, return None (don't make API call) return None # Check database first to avoid duplicate API calls existing_data = self._get_existing_geocoded_data(latitude, longitude) if existing_data and existing_data.get('city'): return existing_data.get('city') try: # Use rate-limited Nominatim reverse geocoding location = rate_limited_nominatim_reverse_sync( self.bot, f"{latitude}, {longitude}", timeout=10 ) if location: address = location.raw.get('address', {}) # Get city name from various fields (in order of preference) city = (address.get('city') or address.get('town') or address.get('village') or address.get('hamlet') or address.get('municipality') or address.get('suburb')) # If no city found, try county as fallback (for rural areas) # Keep "County" in the name to disambiguate from cities with the same name if not city: county = address.get('county') if county: # Keep full county name to distinguish from cities (e.g., "Snohomish County" vs "Snohomish" city) city = county # Keep "County" suffix to avoid ambiguity self.logger.debug(f"Using county '{county}' as location name for coordinates {latitude}, {longitude}") if city: # For large cities, try to get neighborhood information neighborhood = self._get_neighborhood_for_large_city(address, city) if neighborhood: return f"{neighborhood}, {city}" else: return city return None except Exception as e: self.logger.debug(f"Error getting city from coordinates {latitude}, {longitude}: {e}") return None def _get_full_location_from_coordinates(self, latitude: float, longitude: float, packet_hash: Optional[str] = None) -> dict[str, Optional[str]]: """Get complete location information (city, state, country) from coordinates using reverse geocoding""" location_info: dict[str, Optional[str]] = { 'city': None, 'state': None, 'country': None } try: # Validate coordinates first if latitude == 0.0 and longitude == 0.0: self.logger.debug(f"Skipping geocoding for hidden location: {latitude}, {longitude}") return location_info # Check for valid coordinate ranges if not (-90 <= latitude <= 90) or not (-180 <= longitude <= 180): self.logger.debug(f"Skipping geocoding for invalid coordinates: {latitude}, {longitude}") return location_info # Check packet hash cache first (before database check) if packet_hash and packet_hash != "0000000000000000": current_time = time.time() if packet_hash in self.geocoding_cache: cache_age = current_time - self.geocoding_cache[packet_hash] if cache_age < self.geocoding_cache_window: self.logger.debug(f"📍 Skipping geocoding API call for packet_hash {packet_hash[:16]}... (geocoded {cache_age:.1f}s ago)") # Still check database for location data existing_data = self._get_existing_geocoded_data(latitude, longitude) if existing_data: return existing_data # If no database data, return empty (don't make API call) return location_info # Check database first for existing geocoded data existing_data = self._get_existing_geocoded_data(latitude, longitude) if existing_data: return existing_data # Check cache second to avoid duplicate API calls cache_key = f"location_{latitude:.6f}_{longitude:.6f}" cached_result = self.db_manager.get_cached_json(cache_key, "geolocation") if cached_result: self.logger.debug(f"Using cached location data for {latitude}, {longitude}") return cached_result # Use rate-limited Nominatim reverse geocoding self.logger.debug(f"Calling Nominatim reverse geocoding for {latitude}, {longitude}") location = rate_limited_nominatim_reverse_sync( self.bot, f"{latitude}, {longitude}", timeout=10 ) if location: address = location.raw.get('address', {}) self.logger.debug(f"Geocoding API returned address data: {list(address.keys())}") # Get city name from various fields (in order of preference) city = (address.get('city') or address.get('town') or address.get('village') or address.get('hamlet') or address.get('municipality') or address.get('suburb')) # If no city found, try county as fallback (for rural areas) # Keep "County" in the name to disambiguate from cities with the same name if not city: county = address.get('county') if county: # Keep full county name to distinguish from cities (e.g., "Snohomish County" vs "Snohomish" city) city = county # Keep "County" suffix to avoid ambiguity self.logger.debug(f"Using county '{county}' as location name for coordinates {latitude}, {longitude}") if city: # For large cities, try to get neighborhood information neighborhood = self._get_neighborhood_for_large_city(address, city) if neighborhood: location_info['city'] = f"{neighborhood}, {city}" else: location_info['city'] = city self.logger.debug(f"Extracted city: {location_info['city']}") # Get state/province information (don't use county here since we may have used it for city) state = (address.get('state') or address.get('province') or address.get('region')) if state: location_info['state'] = state self.logger.debug(f"Extracted state: {state}") # Get country information country = (address.get('country') or address.get('country_code')) if country: location_info['country'] = country self.logger.debug(f"Extracted country: {country}") else: self.logger.warning(f"Geocoding API returned no location for {latitude}, {longitude}") # Cache the result for 30 days - geolocation data is very stable self.db_manager.cache_json(cache_key, location_info, "geolocation", cache_hours=720) return location_info except Exception as e: error_msg = str(e) if "No route to host" in error_msg or "Connection" in error_msg: self.logger.warning(f"Network error geocoding {latitude}, {longitude}: {error_msg}") else: self.logger.debug(f"Error getting full location from coordinates {latitude}, {longitude}: {e}") return location_info def _get_neighborhood_for_large_city(self, address: dict, city: str) -> Optional[str]: """Get neighborhood information for large cities""" try: # List of large cities where neighborhood info is useful large_cities = [ 'seattle', 'portland', 'san francisco', 'los angeles', 'san diego', 'chicago', 'new york', 'boston', 'philadelphia', 'washington', 'atlanta', 'miami', 'houston', 'dallas', 'austin', 'denver', 'phoenix', 'las vegas', 'minneapolis', 'detroit', 'cleveland', 'pittsburgh', 'baltimore', 'richmond', 'norfolk', 'tampa', 'orlando', 'jacksonville', 'nashville', 'memphis', 'kansas city', 'st louis', 'milwaukee', 'cincinnati', 'columbus', 'indianapolis', 'louisville', 'lexington', 'charlotte', 'raleigh', 'greensboro', 'winston-salem', 'durham', 'charleston', 'columbia', 'greenville', 'savannah', 'augusta', 'macon', 'columbus', 'atlanta' ] # Check if this is a large city if city.lower() not in large_cities: return None # Try to get neighborhood information from various address fields neighborhood_fields = [ 'neighbourhood', 'neighborhood', 'suburb', 'quarter', 'district', 'area', 'locality', 'hamlet', 'village', 'town' ] for field in neighborhood_fields: if field in address and address[field]: neighborhood = address[field] # Skip if it's the same as the city name if neighborhood.lower() != city.lower(): return neighborhood # For Seattle specifically, try to get more specific area info if city.lower() == 'seattle': # Check for specific Seattle neighborhoods/areas seattle_areas = [ 'capitol hill', 'ballard', 'fremont', 'queen anne', 'belltown', 'pioneer square', 'international district', 'chinatown', 'first hill', 'central district', 'central', 'beacon hill', 'columbia city', 'rainier valley', 'west seattle', 'alki', 'magnolia', 'greenwood', 'phinney ridge', 'wallingford', 'university district', 'udistrict', 'ravenna', 'laurelhurst', 'sand point', 'wedgwood', 'view ridge', 'matthews beach', 'lake city', 'bitter lake', 'broadview', 'crown hill', 'loyal heights', 'sunset hill', 'interbay', 'downtown', 'south lake union', 'denny triangle', 'denny regrade', 'eastlake', 'montlake', 'madison park', 'madrona', 'leschi', 'mount baker', 'columbia city', 'rainier beach', 'south park', 'georgetown', 'soho', 'industrial district' ] # Check if any of the address fields contain Seattle neighborhood names for field, value in address.items(): if isinstance(value, str): value_lower = value.lower() for area in seattle_areas: if area in value_lower: return area.title() return None except Exception as e: self.logger.debug(f"Error getting neighborhood for {city}: {e}") return None def _is_repeater_device(self, contact_data: dict) -> bool: """Check if a contact is a repeater or room server using available contact data""" try: # Primary detection: Check device type field # Based on the actual contact data structure: # device_type 2 = repeater, device_type 3 = room server device_type = contact_data.get('type') if device_type in [2, 3]: return True # Secondary detection: Check for role fields in contact data role_fields = ['role', 'device_role', 'mode', 'device_type'] for field in role_fields: value = contact_data.get(field, '') if value and isinstance(value, str): value_lower = value.lower() if any(role in value_lower for role in ['repeater', 'roomserver', 'room_server']): return True # Tertiary detection: Check advertisement flags # Some repeaters have specific flags that indicate their function flags = contact_data.get('flags', contact_data.get('advert_flags', '')) if flags and isinstance(flags, (int, str)): flags_str = str(flags).lower() if any(role in flags_str for role in ['repeater', 'roomserver', 'room_server']): return True # Quaternary detection: Check name patterns with validation name = contact_data.get('adv_name', contact_data.get('name', '')).lower() if name: # Strong repeater indicators strong_indicators = ['repeater', 'roompeater', 'room server', 'roomserver', 'relay', 'gateway'] if any(indicator in name for indicator in strong_indicators): return True # Room server indicators room_indicators = ['room', 'rs ', 'rs-', 'rs_'] if any(indicator in name for indicator in room_indicators): # Additional validation to avoid false positives user_indicators = ['user', 'person', 'mobile', 'phone', 'device', 'pager'] if not any(user_indicator in name for user_indicator in user_indicators): return True # Quinary detection: Check path characteristics # Some repeaters have specific path patterns out_path_len = contact_data.get('out_path_len', -1) if out_path_len == 0: # Direct connection might indicate repeater # Additional validation with name check if name and any(indicator in name for indicator in ['repeater', 'room', 'relay']): return True return False except Exception as e: self.logger.error(f"Error checking if device is repeater: {e}") return False def _is_companion_device(self, contact_data: dict) -> bool: """Check if a contact is a companion (human user, not a repeater)""" try: # Companion is simply the inverse of repeater return not self._is_repeater_device(contact_data) except Exception as e: self.logger.error(f"Error checking if device is companion: {e}") return False def _is_in_acl(self, public_key: str) -> bool: """Check if a public key is in the bot's admin ACL (should never be purged)""" try: if not hasattr(self.bot, 'config') or not self.bot.config.has_section('Admin_ACL'): return False # Get admin pubkeys from config admin_pubkeys = self.bot.config.get('Admin_ACL', 'admin_pubkeys', fallback='') if not admin_pubkeys: return False # Parse admin pubkeys admin_pubkey_list = [key.strip() for key in admin_pubkeys.split(',') if key.strip()] if not admin_pubkey_list: return False # Check if public key matches any admin key (exact match required for security) return any(public_key == admin_key for admin_key in admin_pubkey_list) except Exception as e: self.logger.error(f"Error checking ACL membership: {e}") return False # Default to not in ACL on error (safer) def _get_last_dm_activity(self, public_key: str, sender_id: Optional[str] = None) -> Optional[datetime]: """Get the timestamp of the last DM from a contact""" try: # Try to find sender_id from contact if not provided if not sender_id: # Try to get sender_id from device contacts if hasattr(self.bot.meshcore, 'contacts'): for contact_key, contact_data in list(self.bot.meshcore.contacts.items()): if contact_data.get('public_key', contact_key) == public_key: sender_id = contact_data.get('name', contact_data.get('adv_name', '')) break if not sender_id: # Try to get from complete_contact_tracking tracking_data = self.db_manager.execute_query( 'SELECT name FROM complete_contact_tracking WHERE public_key = ? LIMIT 1', (public_key,) ) if tracking_data: sender_id = tracking_data[0]['name'] if not sender_id: return None # Query message_stats for last DM query = ''' SELECT MAX(timestamp) as last_dm_timestamp FROM message_stats WHERE sender_id = ? AND is_dm = 1 ''' results = self.db_manager.execute_query(query, (sender_id,)) if results and results[0]['last_dm_timestamp']: timestamp = results[0]['last_dm_timestamp'] # Convert to datetime if isinstance(timestamp, (int, float)): return datetime.fromtimestamp(timestamp) elif isinstance(timestamp, str): try: return datetime.fromisoformat(timestamp.replace('Z', '+00:00')) except: return None return None except Exception as e: self.logger.debug(f"Error getting last DM activity for {public_key}: {e}") return None def _get_last_advert_activity(self, public_key: str) -> Optional[datetime]: """Get the timestamp of the last advert from a contact""" try: # Query complete_contact_tracking for last advert query = ''' SELECT last_advert_timestamp, last_heard FROM complete_contact_tracking WHERE public_key = ? AND role = 'companion' LIMIT 1 ''' results = self.db_manager.execute_query(query, (public_key,)) if results: # Prefer last_advert_timestamp, fallback to last_heard timestamp = results[0].get('last_advert_timestamp') or results[0].get('last_heard') if timestamp: # Convert to datetime if isinstance(timestamp, datetime): return timestamp elif isinstance(timestamp, str): try: return datetime.fromisoformat(timestamp.replace('Z', '+00:00')) except: # Try parsing as timestamp try: return datetime.fromtimestamp(float(timestamp)) except: return None elif isinstance(timestamp, (int, float)): return datetime.fromtimestamp(timestamp) return None except Exception as e: self.logger.debug(f"Error getting last advert activity for {public_key}: {e}") return None async def scan_and_catalog_repeaters(self) -> int: """Scan current contacts and catalog any repeaters found""" # Wait for contacts to be loaded if they're not ready yet if not hasattr(self.bot.meshcore, 'contacts') or not self.bot.meshcore.contacts: self.logger.info("Contacts not loaded yet, waiting...") # Wait up to 10 seconds for contacts to load for _i in range(20): # 20 * 0.5 = 10 seconds await asyncio.sleep(0.5) if hasattr(self.bot.meshcore, 'contacts') and self.bot.meshcore.contacts: break else: self.logger.warning("No contacts available to scan for repeaters after waiting") return 0 contacts = self.bot.meshcore.contacts self.logger.info(f"Scanning {len(contacts)} contacts for repeaters...") cataloged_count = 0 updated_count = 0 processed_count = 0 try: for contact_key, contact_data in list(self.bot.meshcore.contacts.items()): processed_count += 1 # Log progress every 20 contacts if processed_count % 20 == 0: self.logger.info(f"Scan progress: {processed_count}/{len(contacts)} contacts processed, {cataloged_count} repeaters found") # Debug logging for first few contacts to understand structure if processed_count <= 5: self.logger.debug(f"Contact {processed_count}: {contact_data.get('name', 'Unknown')} (type: {contact_data.get('type')}, keys: {list(contact_data.keys())})") if self._is_repeater_device(contact_data): public_key = contact_data.get('public_key', contact_key) name = contact_data.get('adv_name', contact_data.get('name', 'Unknown')) self.logger.info(f"Found repeater: {name} (type: {contact_data.get('type')}, key: {public_key[:16]}...)") # Determine device type based on contact data contact_type = contact_data.get('type') if contact_type == 3: device_type = 'RoomServer' elif contact_type == 2: device_type = 'Repeater' else: # Fallback to name-based detection device_type = 'Repeater' if 'room' in name.lower() or 'server' in name.lower(): device_type = 'RoomServer' # Extract location data from contact_data location_info = self._extract_location_data(contact_data, should_geocode=False) # Check if already exists and get existing location data existing = self.db_manager.execute_query( 'SELECT id, last_seen, latitude, longitude, city FROM repeater_contacts WHERE public_key = ?', (public_key,) ) # Check if we need to perform geocoding based on location changes existing_data = None if existing: existing_data = { 'latitude': existing[0][2], 'longitude': existing[0][3], 'city': existing[0][4] } should_geocode, location_info = self._should_geocode_location(location_info, existing_data, name) if should_geocode: city_from_coords = self._get_city_from_coordinates( location_info['latitude'], location_info['longitude'] ) if city_from_coords: location_info['city'] = city_from_coords if existing: # Update last_seen timestamp and location data if available update_query = 'UPDATE repeater_contacts SET last_seen = CURRENT_TIMESTAMP, is_active = 1' update_params = [] # Add location fields if we have new data if location_info['latitude'] is not None: update_query += ', latitude = ?' update_params.append(location_info['latitude']) if location_info['longitude'] is not None: update_query += ', longitude = ?' update_params.append(location_info['longitude']) if location_info['city']: update_query += ', city = ?' update_params.append(location_info['city']) if location_info['state']: update_query += ', state = ?' update_params.append(location_info['state']) if location_info['country']: update_query += ', country = ?' update_params.append(location_info['country']) update_query += ' WHERE public_key = ?' update_params.append(public_key) self.db_manager.execute_update(update_query, tuple(update_params)) updated_count += 1 else: # Insert new repeater with location data self.db_manager.execute_update(''' INSERT INTO repeater_contacts (public_key, name, device_type, contact_data, latitude, longitude, city, state, country) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( public_key, name, device_type, json.dumps(contact_data), location_info['latitude'], location_info['longitude'], location_info['city'], location_info['state'], location_info['country'] )) # Log the addition self.db_manager.execute_update(''' INSERT INTO purging_log (action, public_key, name, reason) VALUES ('added', ?, ?, 'Auto-detected during contact scan') ''', (public_key, name)) cataloged_count += 1 location_str = "" if location_info['city'] or location_info['latitude']: if location_info['city']: location_str = f" in {location_info['city']}" if location_info['state']: location_str += f", {location_info['state']}" elif location_info['latitude'] and location_info['longitude']: location_str = f" at {location_info['latitude']:.4f}, {location_info['longitude']:.4f}" self.logger.info(f"Cataloged new repeater: {name} ({device_type}){location_str}") except Exception as e: self.logger.error(f"Error scanning contacts for repeaters: {e}") if cataloged_count > 0: self.logger.info(f"Cataloged {cataloged_count} new repeaters") if updated_count > 0: self.logger.info(f"Updated {updated_count} existing repeaters with location data") self.logger.info(f"Scan completed: {cataloged_count} new repeaters cataloged, {updated_count} existing repeaters updated from {len(contacts)} contacts") self.logger.info(f"Scan summary: {processed_count} contacts processed, {cataloged_count + updated_count} repeaters processed") return cataloged_count async def get_repeater_contacts(self, active_only: bool = True) -> list[dict]: """Get list of repeater contacts from database""" try: query = 'SELECT * FROM repeater_contacts' if active_only: query += ' WHERE is_active = 1' query += ' ORDER BY last_seen DESC' return self.db_manager.execute_query(query) except Exception as e: self.logger.error(f"Error retrieving repeater contacts: {e}") return [] async def test_meshcore_cli_commands(self) -> dict[str, Any]: """Test if meshcore-cli commands are working properly""" results: dict[str, Any] = {} try: from meshcore_cli.meshcore_cli import next_cmd # Test a simple command that should always work try: result = await asyncio.wait_for( next_cmd(self.bot.meshcore, ["help"]), timeout=10.0 ) results['help'] = result is not None self.logger.info(f"meshcore-cli help command test: {'PASS' if results['help'] else 'FAIL'}") except Exception as e: results['help'] = False self.logger.warning(f"meshcore-cli help command test FAILED: {e}") # Test remove_contact command (we'll use a dummy key) try: result = await asyncio.wait_for( next_cmd(self.bot.meshcore, ["remove_contact", "dummy_key"]), timeout=10.0 ) # Even if it fails, if we get here without "Unknown command" error, the command exists results['remove_contact'] = True self.logger.info("meshcore-cli remove_contact command test: PASS") except Exception as e: if "Unknown command" in str(e): results['remove_contact'] = False self.logger.error(f"meshcore-cli remove_contact command test FAILED: {e}") else: # Command exists but failed for other reasons (expected with dummy key) results['remove_contact'] = True self.logger.info("meshcore-cli remove_contact command test: PASS (command exists)") except Exception as e: self.logger.error(f"Error testing meshcore-cli commands: {e}") results['error'] = str(e) return results async def purge_repeater_from_contacts(self, public_key: str, reason: str = "Manual purge") -> bool: """Remove a specific repeater from the device's contact list using proper MeshCore API""" if not self._start_purge_attempt(public_key, "repeater"): return True self.logger.info(f"Starting purge process for public_key: {public_key}") self.logger.debug(f"Purge reason: {reason}") try: # meshcore.contacts is keyed by public_key hex string contact_to_remove = self.bot.meshcore.contacts.get(public_key) if not contact_to_remove: contact_to_remove = self.bot.meshcore.get_contact_by_key_prefix(public_key[:8]) if not contact_to_remove: self.logger.warning(f"Repeater with public key {public_key} not found in current contacts") return False contact_name = contact_to_remove.get('adv_name', contact_to_remove.get('name', 'Unknown')) self.logger.debug(f"Found contact: {contact_name}") # Check if repeater exists in database, if not add it first existing_repeater = self.db_manager.execute_query( 'SELECT id FROM repeater_contacts WHERE public_key = ?', (public_key,) ) if not existing_repeater: device_type = 'Repeater' if contact_to_remove.get('type') == 3 or 'room' in contact_name.lower() or 'server' in contact_name.lower(): device_type = 'RoomServer' self.db_manager.execute_update(''' INSERT INTO repeater_contacts (public_key, name, device_type, contact_data) VALUES (?, ?, ?, ?) ''', ( public_key, contact_name, device_type, json.dumps(contact_to_remove) )) self.logger.info(f"Added repeater {contact_name} to database before purging") # Check if contact is already gone from device if public_key not in self.bot.meshcore.contacts: self.logger.info(f"✅ Contact '{contact_name}' not found in device contacts (already removed) - treating as success") device_removal_successful = True else: # Remove the contact using the proper MeshCore API device_removal_successful = False self.logger.info(f"Removing contact '{contact_name}' from device using MeshCore API...") try: result = await self.bot.meshcore.commands.remove_contact(public_key) if result.type == EventType.OK: device_removal_successful = True self.logger.info(f"✅ Successfully removed contact '{contact_name}' from device") elif result.type == EventType.ERROR: error_code = result.payload.get('error_code') reason_str = result.payload.get('reason') if error_code == 2: # Device says contact not found - treat as success self.logger.info(f"✅ Contact '{contact_name}' not found on device (already removed) - treating as success") device_removal_successful = True else: self.logger.error(f"❌ remove_contact failed for '{contact_name}': device_error_code={error_code}, lib_reason={reason_str}, payload={result.payload}") except Exception as e: self.logger.error(f"❌ Exception calling remove_contact for '{contact_name}': {type(e).__name__}: {e}") # Only mark as inactive in database if device removal was successful if device_removal_successful: self.db_manager.execute_update( 'UPDATE repeater_contacts SET is_active = 0, purge_count = purge_count + 1 WHERE public_key = ?', (public_key,) ) # Log the purge action self.db_manager.execute_update(''' INSERT INTO purging_log (action, public_key, name, reason) VALUES ('purged', ?, ?, ?) ''', (public_key, contact_name, reason)) self.logger.info(f"Successfully purged repeater {contact_name}: {reason}") return True else: self.logger.error(f"Failed to remove repeater {contact_name} from device - not marking as purged in database") self.db_manager.execute_update(''' INSERT INTO purging_log (action, public_key, name, reason) VALUES ('purge_failed', ?, ?, ?) ''', (public_key, contact_name, f"{reason} - Device removal failed")) return False except Exception as e: self.logger.error(f"Error purging repeater {public_key}: {e}") self.logger.debug(f"Error type: {type(e).__name__}") return False finally: self._finish_purge_attempt(public_key) async def purge_companion_from_contacts(self, public_key: str, reason: str = "Manual purge") -> bool: """Remove a companion contact from the device's contact list""" if not self._start_purge_attempt(public_key, "companion"): return True self.logger.info(f"Starting companion purge process for public_key: {public_key}") self.logger.debug(f"Purge reason: {reason}") try: # Safety check: Never purge ACL members if self._is_in_acl(public_key): self.logger.warning(f"❌ Attempted to purge companion in ACL - BLOCKED: {public_key[:16]}...") return False # meshcore.contacts is keyed by public_key hex string contact_to_remove = self.bot.meshcore.contacts.get(public_key) if not contact_to_remove: contact_to_remove = self.bot.meshcore.get_contact_by_key_prefix(public_key[:8]) if not contact_to_remove: self.logger.warning(f"Companion with public key {public_key} not found in current contacts") return False if not self._is_companion_device(contact_to_remove): self.logger.warning(f"Contact {public_key} is not a companion - skipping") return False contact_name = contact_to_remove.get('adv_name', contact_to_remove.get('name', 'Unknown')) self.logger.debug(f"Found companion: {contact_name}") # Check if contact is already gone from device if public_key not in self.bot.meshcore.contacts: self.logger.info(f"✅ Contact '{contact_name}' not found in device contacts (already removed) - treating as success") device_removal_successful = True else: # Remove the contact using the proper MeshCore API device_removal_successful = False self.logger.info(f"Removing companion '{contact_name}' from device using MeshCore API...") try: result = await self.bot.meshcore.commands.remove_contact(public_key) if result.type == EventType.OK: device_removal_successful = True self.logger.info(f"✅ Successfully removed companion '{contact_name}' from device") elif result.type == EventType.ERROR: error_code = result.payload.get('error_code') reason_str = result.payload.get('reason') if error_code == 2: # Device says contact not found - treat as success self.logger.info(f"✅ Companion '{contact_name}' not found on device (already removed) - treating as success") device_removal_successful = True else: self.logger.error(f"❌ remove_contact failed for '{contact_name}': device_error_code={error_code}, lib_reason={reason_str}, payload={result.payload}") except Exception as e: self.logger.error(f"❌ Exception calling remove_contact for '{contact_name}': {type(e).__name__}: {e}") if device_removal_successful: # Remove from local cache optimistically, then refresh from device self.bot.meshcore.contacts.pop(public_key, None) self.logger.debug(f"Removed '{contact_name}' from local contacts cache") await asyncio.sleep(2.0) try: await self.bot.meshcore.commands.get_contacts() self.logger.debug("Refreshed contacts from device") except Exception as e: self.logger.debug(f"Could not refresh contacts from device: {e}") # Update tracking database if device removal was successful if device_removal_successful: self.db_manager.execute_update( 'UPDATE complete_contact_tracking SET is_currently_tracked = 0 WHERE public_key = ?', (public_key,) ) self.db_manager.execute_update(''' INSERT INTO purging_log (action, public_key, name, reason) VALUES ('companion_purged', ?, ?, ?) ''', (public_key, contact_name, reason)) self.logger.info(f"✅ Successfully purged companion {contact_name}: {reason}") return True else: self.logger.error(f"Failed to remove companion {contact_name} from device - not marking as purged in database") self.db_manager.execute_update(''' INSERT INTO purging_log (action, public_key, name, reason) VALUES ('companion_purge_failed', ?, ?, ?) ''', (public_key, contact_name, f"{reason} - Device removal failed")) return False except Exception as e: self.logger.error(f"Error purging companion {public_key}: {e}") self.logger.debug(f"Error type: {type(e).__name__}") return False finally: self._finish_purge_attempt(public_key) async def purge_repeater_by_contact_key(self, contact_key: str, reason: str = "Manual purge") -> bool: """Remove a repeater using the contact key (public_key hex) from the device's contact list""" self.logger.info(f"Starting purge process for contact_key: {contact_key}") self.logger.debug(f"Purge reason: {reason}") try: # meshcore.contacts is keyed by public_key hex - contact_key IS the public_key contact_data = self.bot.meshcore.contacts.get(contact_key) if not contact_data: self.logger.warning(f"Contact with key {contact_key} not found in current contacts") return False contact_name = contact_data.get('adv_name', contact_data.get('name', 'Unknown')) public_key = contact_data.get('public_key', contact_key) self.logger.info(f"Found contact: {contact_name} (public_key: {public_key[:16]}...)") # Check if repeater exists in database, if not add it first existing_repeater = self.db_manager.execute_query( 'SELECT id FROM repeater_contacts WHERE public_key = ?', (public_key,) ) if not existing_repeater: device_type = 'Repeater' if contact_data.get('type') == 3 or 'room' in contact_name.lower() or 'server' in contact_name.lower(): device_type = 'RoomServer' self.db_manager.execute_update(''' INSERT INTO repeater_contacts (public_key, name, device_type, contact_data) VALUES (?, ?, ?, ?) ''', ( public_key, contact_name, device_type, json.dumps(contact_data) )) self.logger.info(f"Added repeater {contact_name} to database before purging") # Remove the contact using the proper MeshCore API device_removal_successful = False self.logger.info(f"Removing contact '{contact_name}' from device using MeshCore API...") try: result = await self.bot.meshcore.commands.remove_contact(public_key) if result.type == EventType.OK: device_removal_successful = True self.logger.info(f"✅ Successfully removed contact '{contact_name}' from device") elif result.type == EventType.ERROR: error_code = result.payload.get('error_code') reason_str = result.payload.get('reason') if error_code == 2: # Device says contact not found - treat as success self.logger.info(f"✅ Contact '{contact_name}' not found on device (already removed) - treating as success") device_removal_successful = True else: self.logger.error(f"❌ remove_contact failed for '{contact_name}': device_error_code={error_code}, lib_reason={reason_str}, payload={result.payload}") except Exception as e: self.logger.error(f"❌ Exception calling remove_contact for '{contact_name}': {type(e).__name__}: {e}") if device_removal_successful: # Remove from local cache, then refresh from device self.bot.meshcore.contacts.pop(public_key, None) self.logger.debug(f"Removed '{contact_name}' from local contacts cache") await asyncio.sleep(2.0) try: await self.bot.meshcore.commands.get_contacts() self.logger.debug("Contacts refreshed from device") except Exception as e: self.logger.debug(f"Could not refresh contacts from device: {e}") # Only mark as inactive in database if device removal was successful if device_removal_successful: self.db_manager.execute_update( 'UPDATE repeater_contacts SET is_active = 0, purge_count = purge_count + 1 WHERE public_key = ?', (public_key,) ) self.db_manager.execute_update(''' INSERT INTO purging_log (action, public_key, name, reason) VALUES ('purged', ?, ?, ?) ''', (public_key, contact_name, reason)) self.logger.info(f"Successfully purged repeater {contact_name}: {reason}") return True else: self.logger.error(f"Failed to remove repeater {contact_name} from device - not marking as purged in database") self.db_manager.execute_update(''' INSERT INTO purging_log (action, public_key, name, reason) VALUES ('purge_failed', ?, ?, ?) ''', (public_key, contact_name, f"{reason} - Device removal failed")) return False except Exception as e: self.logger.error(f"Error purging repeater {contact_key}: {e}") self.logger.debug(f"Error type: {type(e).__name__}") return False async def purge_old_repeaters(self, days_old: int = 30, reason: str = "Automatic purge - old contacts") -> int: """Purge repeaters that haven't been seen in specified days""" try: cutoff_date = datetime.now() - timedelta(days=days_old) # Find old repeaters by checking their actual last_advert time from contact data # We need to cross-reference the database with the current contact data old_repeaters = [] # Get all active repeaters from database all_repeaters = self.db_manager.execute_query(''' SELECT public_key, name FROM repeater_contacts WHERE is_active = 1 ''') # Check each repeater's actual last_advert time for repeater in all_repeaters: public_key = repeater['public_key'] name = repeater['name'] # Find the contact in meshcore.contacts for contact_key, contact_data in list(self.bot.meshcore.contacts.items()): if contact_data.get('public_key', contact_key) == public_key: # Check the actual last_advert time last_advert = contact_data.get('last_advert') if last_advert: try: # Parse the last_advert timestamp if isinstance(last_advert, str): last_advert_dt = datetime.fromisoformat(last_advert.replace('Z', '+00:00')) elif isinstance(last_advert, (int, float)): # Unix timestamp (seconds since epoch) last_advert_dt = datetime.fromtimestamp(last_advert) else: # Assume it's already a datetime object last_advert_dt = last_advert # Check if it's older than cutoff if last_advert_dt < cutoff_date: old_repeaters.append({ 'public_key': public_key, 'name': name, 'last_seen': last_advert }) self.logger.debug(f"Found old repeater: {name} (last_advert: {last_advert} -> {last_advert_dt})") else: self.logger.debug(f"Recent repeater: {name} (last_advert: {last_advert} -> {last_advert_dt})") except Exception as e: self.logger.debug(f"Error parsing last_advert for {name}: {e} (type: {type(last_advert)}, value: {last_advert})") break # Debug logging self.logger.info(f"Purge criteria: cutoff_date = {cutoff_date.isoformat()}, days_old = {days_old}") self.logger.info(f"Found {len(old_repeaters)} repeaters older than {days_old} days") # Show some examples of what we found if old_repeaters: for i, repeater in enumerate(old_repeaters[:3]): # Show first 3 self.logger.info(f"Old repeater {i+1}: {repeater['name']} (last_advert: {repeater['last_seen']})") else: # Show some recent repeaters to understand the timestamp format self.logger.info("No old repeaters found. Showing recent repeater activity:") recent_count = 0 for contact_key, contact_data in list(self.bot.meshcore.contacts.items()): if self._is_repeater_device(contact_data): last_advert = contact_data.get('last_advert', 'No last_advert') name = contact_data.get('adv_name', contact_data.get('name', 'Unknown')) if last_advert != 'No last_advert': try: if isinstance(last_advert, (int, float)): last_advert_dt = datetime.fromtimestamp(last_advert) self.logger.info(f" {name}: {last_advert} (Unix timestamp) -> {last_advert_dt}") else: self.logger.info(f" {name}: {last_advert} (type: {type(last_advert)})") except Exception as e: self.logger.info(f" {name}: {last_advert} (parse error: {e})") else: self.logger.info(f" {name}: No last_advert") recent_count += 1 if recent_count >= 3: break purged_count = 0 # Process repeaters with delays to avoid overwhelming LoRa network self.logger.info(f"Starting batch purge of {len(old_repeaters)} old repeaters...") start_time = asyncio.get_event_loop().time() for i, repeater in enumerate(old_repeaters): public_key = repeater['public_key'] name = repeater['name'] self.logger.info(f"Purging repeater {i+1}/{len(old_repeaters)}: {name}") self.logger.debug(f"Processing public_key: {public_key}") try: if await self.purge_repeater_from_contacts(public_key, f"{reason} (last seen: {cutoff_date.date()})"): purged_count += 1 self.logger.info(f"Successfully purged {i+1}/{len(old_repeaters)}: {name}") else: self.logger.warning(f"Failed to purge {i+1}/{len(old_repeaters)}: {name}") except Exception as e: self.logger.error(f"Exception purging {i+1}/{len(old_repeaters)}: {name} - {e}") # Add delay between removals to avoid overwhelming LoRa network if i < len(old_repeaters) - 1: # Don't delay after the last one self.logger.debug("Waiting 2 seconds before next removal...") await asyncio.sleep(2) # 2 second delay between removals end_time = asyncio.get_event_loop().time() total_duration = end_time - start_time self.logger.info(f"Batch purge completed in {total_duration:.2f} seconds") # After purging, toggle auto-add off and discover new contacts manually if purged_count > 0: await self._post_purge_contact_management() self.logger.info(f"Purged {purged_count} old repeaters (older than {days_old} days)") return purged_count except Exception as e: self.logger.error(f"Error purging old repeaters: {e}") return 0 async def _post_purge_contact_management(self): """Post-purge contact management: enable manual contact addition and discover new contacts manually""" try: self.logger.info("Starting post-purge contact management...") # Step 1: Enable manual contact addition self.logger.info("Enabling manual contact addition on device...") try: from meshcore_cli.meshcore_cli import next_cmd result = await asyncio.wait_for( next_cmd(self.bot.meshcore, ["set_manual_add_contacts", "true"]), timeout=15.0 ) self.logger.info("Successfully enabled manual contact addition") self.logger.debug(f"Manual add contacts enable result: {result}") except asyncio.TimeoutError: self.logger.warning("Timeout enabling manual contact addition (LoRa communication)") except Exception as e: self.logger.error(f"Failed to enable manual contact addition: {e}") # Step 2: Discover new companion contacts manually self.logger.info("Starting manual companion contact discovery...") try: from meshcore_cli.meshcore_cli import next_cmd result = await asyncio.wait_for( next_cmd(self.bot.meshcore, ["discover_companion_contacts"]), timeout=30.0 ) self.logger.info("Successfully initiated companion contact discovery") self.logger.debug(f"Discovery result: {result}") except asyncio.TimeoutError: self.logger.warning("Timeout during companion contact discovery (LoRa communication)") except Exception as e: self.logger.error(f"Failed to discover companion contacts: {e}") # Step 3: Log the post-purge management action self.db_manager.execute_update( 'INSERT INTO purging_log (action, details) VALUES (?, ?)', ('post_purge_management', 'Enabled manual contact addition and initiated companion contact discovery') ) self.logger.info("Post-purge contact management completed") except Exception as e: self.logger.error(f"Error in post-purge contact management: {e}") async def get_contact_list_status(self) -> dict: """Get current contact list status and limits""" try: # Get current contact count current_contacts = len(self.bot.meshcore.contacts) if hasattr(self.bot.meshcore, 'contacts') else 0 # Update contact limit from device info await self._update_contact_limit_from_device() # Use the updated contact limit estimated_limit = self.contact_limit # Calculate usage percentage usage_percentage = (current_contacts / estimated_limit) * 100 if estimated_limit > 0 else 0 # Count repeaters from actual device contacts (more accurate than database) device_repeater_count = 0 if hasattr(self.bot.meshcore, 'contacts'): for _contact_key, contact_data in list(self.bot.meshcore.contacts.items()): if self._is_repeater_device(contact_data): device_repeater_count += 1 # Also get database repeater count for reference db_repeater_count = len(await self.get_repeater_contacts(active_only=True)) # Use device count as primary, fall back to database count repeater_count = device_repeater_count if device_repeater_count > 0 else db_repeater_count # Calculate companion count (total contacts minus repeaters) companion_count = current_contacts - repeater_count # Get contacts without recent adverts (potential candidates for removal) stale_contacts = await self._get_stale_contacts() return { 'current_contacts': current_contacts, 'estimated_limit': estimated_limit, 'usage_percentage': usage_percentage, 'repeater_count': repeater_count, 'companion_count': companion_count, 'stale_contacts_count': len(stale_contacts), 'available_slots': max(0, estimated_limit - current_contacts), 'is_near_limit': usage_percentage > 80, # Warning at 80% 'is_at_limit': usage_percentage >= 95, # Critical at 95% 'stale_contacts': stale_contacts[:10] # Top 10 stale contacts } except Exception as e: self.logger.error(f"Error getting contact list status: {e}") return {} async def _get_stale_contacts(self, days_without_advert: int = 7) -> list[dict]: """Get contacts that haven't sent adverts in specified days""" try: cutoff_date = datetime.now() - timedelta(days=days_without_advert) # Get contacts from device if not hasattr(self.bot.meshcore, 'contacts'): return [] stale_contacts = [] for _contact_key, contact_data in list(self.bot.meshcore.contacts.items()): # Skip repeaters (they're managed separately) if self._is_repeater_device(contact_data): continue # Check last_seen or similar timestamp fields last_seen = contact_data.get('last_seen', contact_data.get('last_advert', contact_data.get('timestamp'))) if last_seen: try: # Parse timestamp if isinstance(last_seen, str): last_seen_dt = datetime.fromisoformat(last_seen.replace('Z', '+00:00')) elif isinstance(last_seen, (int, float)): # Unix timestamp (seconds since epoch) last_seen_dt = datetime.fromtimestamp(last_seen) else: # Assume it's already a datetime object last_seen_dt = last_seen if last_seen_dt < cutoff_date: stale_contacts.append({ 'name': contact_data.get('name', contact_data.get('adv_name', 'Unknown')), 'public_key': contact_data.get('public_key', ''), 'last_seen': last_seen, 'days_stale': (datetime.now() - last_seen_dt).days }) except Exception as e: self.logger.debug(f"Error parsing timestamp for contact {contact_data.get('name', 'Unknown')}: {e}") continue # Sort by days stale (oldest first) stale_contacts.sort(key=lambda x: x['days_stale'], reverse=True) return stale_contacts except Exception as e: self.logger.error(f"Error getting stale contacts: {e}") return [] async def manage_contact_list(self, auto_cleanup: bool = True) -> dict: """Manage contact list to prevent hitting limits""" try: status = await self.get_contact_list_status() if not status: return {'error': 'Failed to get contact list status'} actions_taken = [] # If near limit, start cleanup if status['is_near_limit']: self.logger.warning(f"Contact list at {status['usage_percentage']:.1f}% capacity ({status['current_contacts']}/{status['estimated_limit']})") if auto_cleanup: # Step 1: Remove stale contacts stale_removed = await self._remove_stale_contacts(status['stale_contacts']) if stale_removed > 0: actions_taken.append(f"Removed {stale_removed} stale contacts") # Step 2: If still near limit, remove old repeaters if status['is_near_limit'] and status['repeater_count'] > 0: old_repeaters_removed = await self.purge_old_repeaters(days_old=14, reason="Contact list management - near limit") if old_repeaters_removed > 0: actions_taken.append(f"Removed {old_repeaters_removed} old repeaters") # Step 3: If still at critical limit, more aggressive cleanup if status['is_at_limit']: self.logger.warning("Contact list at critical capacity, performing aggressive cleanup") aggressive_removed = await self._aggressive_contact_cleanup() if aggressive_removed > 0: actions_taken.append(f"Aggressive cleanup removed {aggressive_removed} contacts") # Log the management action if actions_taken: self.db_manager.execute_update( 'INSERT INTO purging_log (action, details) VALUES (?, ?)', ('contact_management', f'Contact list management: {"; ".join(actions_taken)}') ) return { 'status': status, 'actions_taken': actions_taken, 'success': True } except Exception as e: self.logger.error(f"Error managing contact list: {e}") return {'error': str(e), 'success': False} async def _remove_stale_contacts(self, stale_contacts: list[dict], max_remove: int = 10) -> int: """Remove stale contacts to free up space""" try: removed_count = 0 for contact in stale_contacts[:max_remove]: try: contact_name = contact['name'] public_key = contact['public_key'] self.logger.info(f"Removing stale contact: {contact_name} (last seen {contact['days_stale']} days ago)") # Check if we have a valid public key if not public_key or public_key.strip() == '': self.logger.warning(f"Skipping stale contact '{contact_name}': no public key available") continue # Remove from device using MeshCore API result = await asyncio.wait_for( self.bot.meshcore.commands.remove_contact(public_key), timeout=15.0 ) if result.type == EventType.OK: removed_count += 1 self.logger.info(f"✅ Successfully removed stale contact: {contact_name}") # Log the removal self.db_manager.execute_update( 'INSERT INTO purging_log (action, details) VALUES (?, ?)', ('stale_contact_removal', f'Removed stale contact: {contact_name} (last seen {contact["days_stale"]} days ago)') ) else: error_code = result.payload.get('error_code', 'unknown') if hasattr(result, 'payload') else 'unknown' self.logger.warning(f"❌ Failed to remove stale contact: {contact_name} - Error: {result.type}, Code: {error_code}") # Small delay between removals await asyncio.sleep(1) except Exception as e: self.logger.error(f"Error removing stale contact {contact.get('name', 'Unknown')}: {e}") continue return removed_count except Exception as e: self.logger.error(f"Error removing stale contacts: {e}") return 0 async def _aggressive_contact_cleanup(self) -> int: """Perform aggressive cleanup when at critical limit""" try: removed_count = 0 # Remove very old repeaters (7+ days) old_repeaters = await self.purge_old_repeaters(days_old=7, reason="Aggressive cleanup - critical limit") removed_count += old_repeaters # Remove very stale contacts (14+ days) very_stale = await self._get_stale_contacts(days_without_advert=14) stale_removed = await self._remove_stale_contacts(very_stale, max_remove=20) removed_count += stale_removed return removed_count except Exception as e: self.logger.error(f"Error in aggressive contact cleanup: {e}") return 0 async def add_discovered_contact(self, contact_name: str, public_key: Optional[str] = None, reason: str = "Manual addition") -> bool: """Add a discovered contact to the contact list using multiple methods""" try: self.logger.info(f"Adding discovered contact: {contact_name}") # Track whether contact addition was successful contact_addition_successful = False # Method 1: Try using meshcore commands if available if hasattr(self.bot.meshcore, 'commands'): try: self.logger.info("Method 1: Attempting addition via meshcore commands...") # Check if there's an add_contact method if hasattr(self.bot.meshcore.commands, 'add_contact'): # Try different parameter combinations try: # Try with contact_name and public_key result = await self.bot.meshcore.commands.add_contact(contact_name, public_key) if result: self.logger.info(f"Successfully added contact '{contact_name}' via meshcore commands (name+key)") contact_addition_successful = True except Exception as e1: self.logger.debug(f"add_contact(name, key) failed: {e1}") try: # Try with just contact_name result = await self.bot.meshcore.commands.add_contact(contact_name) if result: self.logger.info(f"Successfully added contact '{contact_name}' via meshcore commands (name only)") contact_addition_successful = True except Exception as e2: self.logger.debug(f"add_contact(name) failed: {e2}") self.logger.warning("All meshcore commands add_contact attempts failed") else: self.logger.info("No add_contact method found in meshcore commands") except Exception as e: self.logger.warning(f"Meshcore commands addition failed: {e}") # Method 2: Try CLI as fallback if not contact_addition_successful: try: self.logger.info("Method 2: Attempting addition via CLI...") import io import sys from meshcore_cli.meshcore_cli import next_cmd # Capture stdout/stderr to catch any error messages old_stdout = sys.stdout old_stderr = sys.stderr captured_output = io.StringIO() captured_errors = io.StringIO() try: sys.stdout = captured_output sys.stderr = captured_errors result = await asyncio.wait_for( next_cmd(self.bot.meshcore, ["add_contact", contact_name, public_key] if public_key else ["add_contact", contact_name]), timeout=15.0 ) finally: sys.stdout = old_stdout sys.stderr = old_stderr # Get captured output stdout_content = captured_output.getvalue() stderr_content = captured_errors.getvalue() all_output = stdout_content + stderr_content self.logger.debug(f"CLI command result: {result}") self.logger.debug(f"CLI captured output: {all_output}") if result is not None: self.logger.info(f"CLI: Successfully added contact '{contact_name}' from device") contact_addition_successful = True else: self.logger.warning(f"CLI: Contact addition command returned no result for '{contact_name}'") except Exception as e: self.logger.warning(f"CLI addition failed: {e}") # Method 3: Try discovery approach as last resort if not contact_addition_successful: try: self.logger.info("Method 3: Attempting addition via discovery...") from meshcore_cli.meshcore_cli import next_cmd result = await asyncio.wait_for( next_cmd(self.bot.meshcore, ["discover_companion_contacts"]), timeout=30.0 ) if result is not None: self.logger.info("Contact discovery initiated") contact_addition_successful = True else: self.logger.warning("Contact discovery failed") except Exception as e: self.logger.warning(f"Discovery addition failed: {e}") # Log the addition if successful if contact_addition_successful: self.db_manager.execute_update( 'INSERT INTO purging_log (action, details) VALUES (?, ?)', ('contact_addition', f'Added discovered contact: {contact_name} - {reason}') ) self.logger.info(f"Successfully added contact '{contact_name}': {reason}") return True else: self.logger.error(f"Failed to add contact '{contact_name}' - all methods failed") return False except Exception as e: self.logger.error(f"Error adding discovered contact: {e}") return False async def toggle_auto_add(self, enabled: bool, reason: str = "Manual toggle") -> bool: """Toggle the manual contact addition setting on the device""" try: from meshcore_cli.meshcore_cli import next_cmd self.logger.info(f"{'Enabling' if enabled else 'Disabling'} manual contact addition on device...") result = await asyncio.wait_for( next_cmd(self.bot.meshcore, ["set_manual_add_contacts", "true" if enabled else "false"]), timeout=15.0 ) self.logger.info(f"Successfully {'enabled' if enabled else 'disabled'} manual contact addition") self.logger.debug(f"Manual contact addition toggle result: {result}") # Log the action self.db_manager.execute_update( 'INSERT INTO purging_log (action, details) VALUES (?, ?)', ('manual_add_toggle', f'{"Enabled" if enabled else "Disabled"} manual contact addition - {reason}') ) return True except asyncio.TimeoutError: self.logger.warning("Timeout toggling manual contact addition (LoRa communication)") return False except Exception as e: self.logger.error(f"Failed to toggle manual contact addition: {e}") return False async def discover_companion_contacts(self, reason: str = "Manual discovery") -> bool: """Manually discover companion contacts""" try: from meshcore_cli.meshcore_cli import next_cmd self.logger.info("Starting manual companion contact discovery...") result = await asyncio.wait_for( next_cmd(self.bot.meshcore, ["discover_companion_contacts"]), timeout=30.0 ) self.logger.info("Successfully initiated companion contact discovery") self.logger.debug(f"Discovery result: {result}") # Log the action self.db_manager.execute_update( 'INSERT INTO purging_log (action, details) VALUES (?, ?)', ('companion_discovery', f'Manual companion contact discovery - {reason}') ) return True except asyncio.TimeoutError: self.logger.warning("Timeout during companion contact discovery (LoRa communication)") return False except Exception as e: self.logger.error(f"Failed to discover companion contacts: {e}") return False async def restore_repeater(self, public_key: str, reason: str = "Manual restore") -> bool: """Restore a previously purged repeater""" try: # Get repeater info before updating result = self.db_manager.execute_query(''' SELECT name, contact_data FROM repeater_contacts WHERE public_key = ? ''', (public_key,)) if not result: self.logger.warning(f"No repeater found with public key {public_key}") return False name = result[0]['name'] # Mark as active again self.db_manager.execute_update( 'UPDATE repeater_contacts SET is_active = 1 WHERE public_key = ?', (public_key,) ) # Log the restore action self.db_manager.execute_update(''' INSERT INTO purging_log (action, public_key, name, reason) VALUES ('restored', ?, ?, ?) ''', (public_key, name, reason)) # Note: Restoring a contact to the device would require re-adding it # This is complex as it requires the contact's URI or public key # For now, we just mark it as active in our database # The contact would need to be re-discovered through normal mesh operations self.logger.info(f"Restored repeater {name} ({public_key}) - contact will need to be re-discovered") return True except Exception as e: self.logger.error(f"Error restoring repeater {public_key}: {e}") return False async def get_purging_stats(self) -> dict: """Get statistics about repeater purging operations""" try: # Get total counts total_repeaters = self.db_manager.execute_query('SELECT COUNT(*) as count FROM repeater_contacts')[0]['count'] active_repeaters = self.db_manager.execute_query('SELECT COUNT(*) as count FROM repeater_contacts WHERE is_active = 1')[0]['count'] purged_repeaters = self.db_manager.execute_query('SELECT COUNT(*) as count FROM repeater_contacts WHERE is_active = 0')[0]['count'] # Get recent purging activity recent_activity = self.db_manager.execute_query(''' SELECT action, COUNT(*) as count FROM purging_log WHERE timestamp > datetime('now', '-7 days') GROUP BY action ''') return { 'total_repeaters': total_repeaters, 'active_repeaters': active_repeaters, 'purged_repeaters': purged_repeaters, 'recent_activity_7_days': {row['action']: row['count'] for row in recent_activity} } except Exception as e: self.logger.error(f"Error getting purging stats: {e}") return {} async def cleanup_database(self, days_to_keep_logs: int = 90): """Clean up old purging log entries""" try: cutoff_date = datetime.now() - timedelta(days=days_to_keep_logs) deleted_count = self.db_manager.execute_update( 'DELETE FROM purging_log WHERE timestamp < ?', (cutoff_date.isoformat(),) ) if deleted_count > 0: self.logger.info(f"Cleaned up {deleted_count} old purging log entries") except Exception as e: self.logger.error(f"Error cleaning up database: {e}") def cleanup_repeater_retention( self, daily_stats_days: int = 90, observed_paths_days: int = 90 ) -> None: """Clean up old daily_stats, unique_advert_packets, and observed_paths rows. Called from the scheduler so retention is enforced even when stats command is not run.""" try: total_deleted = 0 # daily_stats and unique_advert_packets use date column cutoff_date = (datetime.now() - timedelta(days=daily_stats_days)).date().isoformat() n = self.db_manager.execute_update( 'DELETE FROM daily_stats WHERE date < ?', (cutoff_date,) ) if n > 0: self.logger.info(f"Cleaned up {n} old daily_stats entries (older than {daily_stats_days} days)") total_deleted += n n = self.db_manager.execute_update( 'DELETE FROM unique_advert_packets WHERE date < ?', (cutoff_date,) ) if n > 0: self.logger.info(f"Cleaned up {n} old unique_advert_packets entries (older than {daily_stats_days} days)") total_deleted += n # observed_paths uses last_seen (timestamp) cutoff_ts = (datetime.now() - timedelta(days=observed_paths_days)).isoformat() n = self.db_manager.execute_update( 'DELETE FROM observed_paths WHERE last_seen < ?', (cutoff_ts,) ) if n > 0: self.logger.info(f"Cleaned up {n} old observed_paths entries (older than {observed_paths_days} days)") total_deleted += n except Exception as e: self.logger.error(f"Error cleaning up repeater retention tables: {e}") # Delegate geocoding cache methods to db_manager def get_cached_geocoding(self, query: str) -> tuple[Optional[float], Optional[float]]: """Get cached geocoding result for a query""" return self.db_manager.get_cached_geocoding(query) def cache_geocoding(self, query: str, latitude: float, longitude: float, cache_hours: int = 720): """Cache geocoding result for future use (default: 30 days)""" self.db_manager.cache_geocoding(query, latitude, longitude, cache_hours) def cleanup_geocoding_cache(self): """Remove expired geocoding cache entries""" self.db_manager.cleanup_geocoding_cache() async def populate_missing_geolocation_data(self, dry_run: bool = False, batch_size: int = 10) -> dict[str, Any]: """Populate missing geolocation data (state, country) for repeaters that have coordinates but missing location info""" try: # Check network connectivity first if not dry_run: try: import socket socket.create_connection(("nominatim.openstreetmap.org", 443), timeout=5) except OSError: return { 'total_found': 0, 'updated': 0, 'errors': 1, 'skipped': 0, 'error': 'No network connectivity to geocoding service' } # Find contacts with valid coordinates but missing state or country # Use complete_contact_tracking table to match the geocoding status command repeaters_to_update = self.db_manager.execute_query(''' SELECT id, name, latitude, longitude, city, state, country FROM complete_contact_tracking WHERE latitude IS NOT NULL AND longitude IS NOT NULL AND NOT (latitude = 0.0 AND longitude = 0.0) AND latitude BETWEEN -90 AND 90 AND longitude BETWEEN -180 AND 180 AND (city IS NULL OR city = '' OR state IS NULL OR country IS NULL) AND last_geocoding_attempt IS NULL ORDER BY last_heard DESC LIMIT ? ''', (batch_size,)) if not repeaters_to_update: return { 'total_found': 0, 'updated': 0, 'errors': 0, 'skipped': 0 } self.logger.info(f"Found {len(repeaters_to_update)} repeaters with missing geolocation data") updated_count = 0 error_count = 0 skipped_count = 0 for repeater in repeaters_to_update: repeater_id = repeater['id'] name = repeater['name'] latitude = repeater['latitude'] longitude = repeater['longitude'] current_city = repeater['city'] current_state = repeater['state'] current_country = repeater['country'] try: # Get full location information from coordinates location_info = self._get_full_location_from_coordinates(latitude, longitude, packet_hash=None) # Debug logging to see what we got self.logger.debug(f"Geocoding result for {name}: city='{location_info['city']}', state='{location_info['state']}', country='{location_info['country']}'") # Check if we got any useful data if not any(location_info.values()): self.logger.debug(f"No location data found for {name} at {latitude}, {longitude}") skipped_count += 1 # Still add delay to be respectful to the API await asyncio.sleep(2.0) continue # Determine what needs to be updated updates = [] params = [] # Update city if we don't have one or if the new one is more detailed if not current_city and location_info['city']: updates.append('city = ?') params.append(location_info['city']) elif current_city and location_info['city'] and len(location_info['city']) > len(current_city): # Update if new city info is more detailed (e.g., includes neighborhood) updates.append('city = ?') params.append(location_info['city']) # Update state if missing if not current_state and location_info['state']: updates.append('state = ?') params.append(location_info['state']) # Update country if missing if not current_country and location_info['country']: updates.append('country = ?') params.append(location_info['country']) if updates: if not dry_run: # Update the database - use complete_contact_tracking table update_query = f"UPDATE complete_contact_tracking SET {', '.join(updates)} WHERE id = ?" params.append(repeater_id) self.db_manager.execute_update(update_query, tuple(params)) # Log the actual values being updated update_details = [] for i, update in enumerate(updates): field = update.split(' = ')[0] value = params[i] if i < len(params) else 'Unknown' update_details.append(f"{field} = {value}") self.logger.info(f"Updated geolocation for {name}: {', '.join(update_details)}") else: self.logger.info(f"[DRY RUN] Would update {name}: {', '.join(updates)}") updated_count += 1 else: self.logger.debug(f"No updates needed for {name}") skipped_count += 1 # Add longer delay to avoid overwhelming the geocoding service # Nominatim has a rate limit of 1 request per second, we'll be more conservative await asyncio.sleep(2.0) except Exception as e: error_msg = str(e) if "429" in error_msg or "Bandwidth limit exceeded" in error_msg: self.logger.warning(f"Rate limited by geocoding service for {name}. Waiting longer...") # Wait longer if we're rate limited await asyncio.sleep(10.0) error_count += 1 elif "No route to host" in error_msg or "Connection" in error_msg: self.logger.warning(f"Network connectivity issue for {name}. Skipping...") # Skip this repeater due to network issues skipped_count += 1 else: self.logger.error(f"Error updating geolocation for {name}: {e}") error_count += 1 continue result = { 'total_found': len(repeaters_to_update), 'updated': updated_count, 'errors': error_count, 'skipped': skipped_count } if not dry_run: self.logger.info(f"Geolocation update completed: {updated_count} updated, {error_count} errors, {skipped_count} skipped") else: self.logger.info(f"Geolocation update dry run completed: {updated_count} would be updated, {error_count} errors, {skipped_count} skipped") return result except Exception as e: self.logger.error(f"Error populating missing geolocation data: {e}") return { 'total_found': 0, 'updated': 0, 'errors': 1, 'skipped': 0, 'error': str(e) } async def periodic_contact_monitoring(self): """Periodic monitoring of contact limit and auto-purge if needed""" try: if not self.auto_purge_enabled: return current_count = len(self.bot.meshcore.contacts) # Log current status if current_count >= self.auto_purge_threshold: self.logger.warning(f"⚠️ Contact limit monitoring: {current_count}/{self.contact_limit} contacts (threshold: {self.auto_purge_threshold})") # Trigger auto-purge await self.check_and_auto_purge() elif current_count >= self.auto_purge_threshold - 20: self.logger.info(f"📊 Contact limit monitoring: {current_count}/{self.contact_limit} contacts (approaching threshold)") else: self.logger.debug(f"📊 Contact limit monitoring: {current_count}/{self.contact_limit} contacts (healthy)") # Background geocoding for contacts missing location data await self._background_geocoding() except Exception as e: self.logger.error(f"Error in periodic contact monitoring: {e}") async def _background_geocoding(self): """Background geocoding for contacts missing location data""" try: # Find contacts with coordinates but missing city data contacts_needing_geocoding = self.db_manager.execute_query(''' SELECT id, name, latitude, longitude, city, state, country FROM complete_contact_tracking WHERE latitude IS NOT NULL AND longitude IS NOT NULL AND (city IS NULL OR city = '') AND last_geocoding_attempt IS NULL ORDER BY last_heard DESC LIMIT 1 ''') if not contacts_needing_geocoding: return contact = contacts_needing_geocoding[0] contact_id = contact['id'] name = contact['name'] lat = contact['latitude'] lon = contact['longitude'] self.logger.debug(f"🌍 Background geocoding: {name} ({lat}, {lon})") # Attempt geocoding try: # Get city from coordinates city = self._get_city_from_coordinates(lat, lon) # Get state and country from coordinates state, country = self._get_state_country_from_coordinates(lat, lon) # Update the contact with geocoded data updates = [] params = [] if city: updates.append("city = ?") params.append(city) if state: updates.append("state = ?") params.append(state) if country: updates.append("country = ?") params.append(country) # Always update the geocoding attempt timestamp updates.append("last_geocoding_attempt = ?") params.append(datetime.now()) if updates: params.append(contact_id) query = f"UPDATE complete_contact_tracking SET {', '.join(updates)} WHERE id = ?" self.db_manager.execute_update(query, params) self.logger.info(f"✅ Background geocoding successful: {name} → {city or 'Unknown'}, {state or 'Unknown'}, {country or 'Unknown'}") else: # Mark as attempted even if no data was found self.db_manager.execute_update( 'UPDATE complete_contact_tracking SET last_geocoding_attempt = ? WHERE id = ?', (datetime.now(), contact_id) ) self.logger.debug(f"🌍 Background geocoding: {name} - no additional location data found") except Exception as e: # Mark as attempted even if geocoding failed self.db_manager.execute_update( 'UPDATE complete_contact_tracking SET last_geocoding_attempt = ? WHERE id = ?', (datetime.now(), contact_id) ) self.logger.debug(f"🌍 Background geocoding failed for {name}: {e}") except Exception as e: self.logger.debug(f"Background geocoding error: {e}") async def _update_contact_limit_from_device(self): """Update contact limit from device using proper MeshCore API""" try: # Use the correct MeshCore API to get device info device_info = await self.bot.meshcore.commands.send_device_query() # Check if the query was successful if hasattr(device_info, 'type') and device_info.type.name == 'DEVICE_INFO': max_contacts = device_info.payload.get("max_contacts") if max_contacts and max_contacts > 100: self.contact_limit = max_contacts # Update threshold to be 20 contacts below the limit self.auto_purge_threshold = max(200, max_contacts - 20) self.logger.debug(f"Updated contact limit from device query: {self.contact_limit} (threshold: {self.auto_purge_threshold})") return True else: self.logger.debug(f"Device returned invalid max_contacts: {max_contacts}") else: self.logger.debug(f"Device query failed: {device_info}") except Exception as e: self.logger.debug(f"Could not update contact limit from device: {e}") # Keep default values if device query failed self.logger.debug(f"Using default contact limit: {self.contact_limit}") return False async def get_auto_purge_status(self) -> dict: """Get current auto-purge configuration and status""" try: # Update contact limit from device info await self._update_contact_limit_from_device() current_count = len(self.bot.meshcore.contacts) return { 'enabled': self.auto_purge_enabled, 'contact_limit': self.contact_limit, 'threshold': self.auto_purge_threshold, 'current_count': current_count, 'usage_percentage': (current_count / self.contact_limit) * 100, 'is_near_limit': current_count >= self.auto_purge_threshold, 'is_at_limit': current_count >= self.contact_limit } except Exception as e: self.logger.error(f"Error getting auto-purge status: {e}") return { 'enabled': False, 'error': str(e) } async def test_purge_system(self) -> dict: """Test the improved purge system with a single contact""" try: # Find a test contact to purge test_contact = None test_public_key = None # Look for a repeater contact to test with for key, contact_data in list(self.bot.meshcore.contacts.items()): if self._is_repeater_device(contact_data): test_contact = contact_data test_public_key = str(contact_data.get('public_key', key)) break if not test_contact: return { 'success': False, 'error': 'No repeater contacts found to test with', 'contact_count': len(self.bot.meshcore.contacts) } contact_name = test_contact.get('adv_name', test_contact.get('name', 'Unknown')) initial_count = len(self.bot.meshcore.contacts) self.logger.info(f"Testing purge system with contact: {contact_name}") # Test the purge (test_public_key is guaranteed non-None here; set in the loop above) assert test_public_key is not None success = await self.purge_repeater_from_contacts(test_public_key, "Test purge - system validation") final_count = len(self.bot.meshcore.contacts) return { 'success': success, 'test_contact': contact_name, 'initial_count': initial_count, 'final_count': final_count, 'contacts_removed': initial_count - final_count, 'purge_method': 'Improved MeshCore API' } except Exception as e: self.logger.error(f"Error testing purge system: {e}") return { 'success': False, 'error': str(e) } def get_daily_advertisement_stats(self, days: int = 30) -> dict: """Get daily advertisement statistics for the specified number of days""" try: from datetime import date, timedelta # Calculate date range end_date = date.today() start_date = end_date - timedelta(days=days-1) # Get daily advertisement counts with contact details daily_stats = self.db_manager.execute_query(''' SELECT ds.date, COUNT(DISTINCT ds.public_key) as unique_nodes, SUM(ds.advert_count) as total_adverts, AVG(ds.advert_count) as avg_adverts_per_node, COUNT(DISTINCT c.role) as unique_roles, COUNT(DISTINCT c.device_type) as unique_device_types FROM daily_stats ds LEFT JOIN complete_contact_tracking c ON ds.public_key = c.public_key WHERE ds.date >= ? AND ds.date <= ? GROUP BY ds.date ORDER BY ds.date DESC ''', (start_date, end_date)) # Get summary statistics summary = self.db_manager.execute_query(''' SELECT COUNT(DISTINCT ds.public_key) as total_unique_nodes, SUM(ds.advert_count) as total_advertisements, COUNT(DISTINCT ds.date) as active_days, AVG(ds.advert_count) as avg_adverts_per_day, COUNT(DISTINCT c.role) as unique_roles, COUNT(DISTINCT c.device_type) as unique_device_types FROM daily_stats ds LEFT JOIN complete_contact_tracking c ON ds.public_key = c.public_key WHERE ds.date >= ? AND ds.date <= ? ''', (start_date, end_date)) return { 'daily_stats': daily_stats, 'summary': summary[0] if summary else {}, 'date_range': { 'start': start_date.isoformat(), 'end': end_date.isoformat(), 'days': days } } except Exception as e: self.logger.error(f"Error getting daily advertisement stats: {e}") return {'error': str(e)} def get_nodes_per_day_stats(self, days: int = 30) -> dict: """Get nodes-per-day statistics for accurate daily tracking""" try: from datetime import date, timedelta # Calculate date range end_date = date.today() start_date = end_date - timedelta(days=days-1) # Get nodes per day with role breakdowns nodes_per_day = self.db_manager.execute_query(''' SELECT ds.date, COUNT(DISTINCT ds.public_key) as unique_nodes, COUNT(DISTINCT CASE WHEN c.role = 'repeater' THEN ds.public_key END) as repeaters, COUNT(DISTINCT CASE WHEN c.role = 'companion' THEN ds.public_key END) as companions, COUNT(DISTINCT CASE WHEN c.role = 'roomserver' THEN ds.public_key END) as room_servers, COUNT(DISTINCT CASE WHEN c.role = 'sensor' THEN ds.public_key END) as sensors FROM daily_stats ds LEFT JOIN complete_contact_tracking c ON ds.public_key = c.public_key WHERE ds.date >= ? AND ds.date <= ? GROUP BY ds.date ORDER BY ds.date DESC ''', (start_date, end_date)) return { 'nodes_per_day': nodes_per_day, 'date_range': { 'start': start_date.isoformat(), 'end': end_date.isoformat(), 'days': days } } except Exception as e: self.logger.error(f"Error getting nodes per day stats: {e}") return {'error': str(e)}