mirror of
https://github.com/agessaman/meshcore-bot.git
synced 2026-03-30 20:15:40 +00:00
406 lines
17 KiB
Python
406 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Base command class for all MeshCore Bot commands
|
|
Provides common functionality and interface for command implementations
|
|
"""
|
|
|
|
from abc import ABC, abstractmethod
|
|
from typing import Optional, List, Dict, Any, Tuple
|
|
from datetime import datetime
|
|
import pytz
|
|
from ..models import MeshMessage
|
|
|
|
|
|
class BaseCommand(ABC):
|
|
"""Base class for all bot commands - Plugin Interface"""
|
|
|
|
# Plugin metadata - to be overridden by subclasses
|
|
name: str = ""
|
|
keywords: List[str] = [] # All trigger words for this command (including name and aliases)
|
|
description: str = ""
|
|
requires_dm: bool = False
|
|
cooldown_seconds: int = 0
|
|
category: str = "general"
|
|
|
|
def __init__(self, bot):
|
|
self.bot = bot
|
|
self.logger = bot.logger
|
|
self._last_execution_time = 0
|
|
|
|
# Load translated keywords after initialization
|
|
self._load_translated_keywords()
|
|
|
|
def translate(self, key: str, **kwargs) -> str:
|
|
"""
|
|
Translate a key using the bot's translator
|
|
|
|
Args:
|
|
key: Dot-separated key path (e.g., 'commands.wx.usage')
|
|
**kwargs: Formatting parameters for string.format()
|
|
|
|
Returns:
|
|
Translated string, or key if translation not found
|
|
"""
|
|
if hasattr(self.bot, 'translator'):
|
|
return self.bot.translator.translate(key, **kwargs)
|
|
# Fallback if translator not available
|
|
return key
|
|
|
|
def translate_get_value(self, key: str) -> Any:
|
|
"""
|
|
Get a raw value from translations (can be string, list, dict, etc.)
|
|
|
|
Args:
|
|
key: Dot-separated key path (e.g., 'commands.hacker.sudo_errors')
|
|
|
|
Returns:
|
|
The value at the key path, or None if not found
|
|
"""
|
|
if hasattr(self.bot, 'translator'):
|
|
return self.bot.translator.get_value(key)
|
|
return None
|
|
|
|
def get_config_value(self, section: str, key: str, fallback=None, value_type: str = 'str'):
|
|
"""
|
|
Get config value with backward compatibility for section name changes.
|
|
|
|
For command configs, checks both old format (e.g., 'Hacker') and new format (e.g., 'Hacker_Command').
|
|
This allows smooth migration from old config format to new standardized format.
|
|
|
|
Args:
|
|
section: Config section name (new format preferred)
|
|
key: Config key name
|
|
fallback: Default value if not found
|
|
value_type: Type of value ('str', 'bool', 'int', 'float')
|
|
|
|
Returns:
|
|
Config value of appropriate type, or fallback if not found
|
|
"""
|
|
# Map of old section names to new standardized names
|
|
section_migration = {
|
|
'Hacker': 'Hacker_Command',
|
|
'Sports': 'Sports_Command',
|
|
'Stats': 'Stats_Command',
|
|
}
|
|
|
|
# Determine old and new section names
|
|
new_section = section
|
|
old_section = None
|
|
for old, new in section_migration.items():
|
|
if new == section:
|
|
old_section = old
|
|
break
|
|
|
|
# Try new section first, then old section for backward compatibility
|
|
sections_to_try = [new_section]
|
|
if old_section:
|
|
sections_to_try.append(old_section)
|
|
|
|
for sec in sections_to_try:
|
|
if self.bot.config.has_section(sec):
|
|
try:
|
|
if value_type == 'bool':
|
|
value = self.bot.config.getboolean(sec, key, fallback=fallback)
|
|
elif value_type == 'int':
|
|
value = self.bot.config.getint(sec, key, fallback=fallback)
|
|
elif value_type == 'float':
|
|
value = self.bot.config.getfloat(sec, key, fallback=fallback)
|
|
else:
|
|
value = self.bot.config.get(sec, key, fallback=fallback)
|
|
|
|
# If we got a value (not fallback), return it
|
|
if value != fallback or self.bot.config.has_option(sec, key):
|
|
# Log migration notice on first use of old section
|
|
if sec == old_section:
|
|
self.logger.info(f"Config migration: Using old section '[{old_section}]' for '{key}'. "
|
|
f"Please update to '[{new_section}]' in config.ini")
|
|
return value
|
|
except Exception as e:
|
|
self.logger.debug(f"Error reading config {sec}.{key}: {e}")
|
|
continue
|
|
|
|
return fallback
|
|
|
|
@abstractmethod
|
|
async def execute(self, message: MeshMessage) -> bool:
|
|
"""Execute the command with the given message"""
|
|
pass
|
|
|
|
def get_help_text(self) -> str:
|
|
"""Get help text for this command"""
|
|
return self.description or "No help available for this command."
|
|
|
|
def can_execute(self, message: MeshMessage) -> bool:
|
|
"""Check if this command can be executed with the given message"""
|
|
# Check if command requires DM and message is not DM
|
|
if self.requires_dm and not message.is_dm:
|
|
return False
|
|
|
|
# Check cooldown
|
|
if self.cooldown_seconds > 0:
|
|
import time
|
|
current_time = time.time()
|
|
if (current_time - self._last_execution_time) < self.cooldown_seconds:
|
|
return False
|
|
|
|
# Check admin ACL if this command requires admin access
|
|
if self.requires_admin_access():
|
|
if not self._check_admin_access(message):
|
|
return False
|
|
|
|
return True
|
|
|
|
def get_metadata(self) -> Dict[str, Any]:
|
|
"""Get plugin metadata for discovery and registration"""
|
|
return {
|
|
'name': self.name,
|
|
'keywords': self.keywords,
|
|
'description': self.description,
|
|
'requires_dm': self.requires_dm,
|
|
'cooldown_seconds': self.cooldown_seconds,
|
|
'category': self.category,
|
|
'class_name': self.__class__.__name__,
|
|
'module_name': self.__class__.__module__
|
|
}
|
|
|
|
async def send_response(self, message: MeshMessage, content: str) -> bool:
|
|
"""Unified method for sending responses to users"""
|
|
try:
|
|
# Use the command manager's send_response method to ensure response capture
|
|
return await self.bot.command_manager.send_response(message, content)
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to send response: {e}")
|
|
return False
|
|
|
|
def _record_execution(self):
|
|
"""Record the execution time for cooldown tracking"""
|
|
import time
|
|
self._last_execution_time = time.time()
|
|
|
|
def get_remaining_cooldown(self) -> int:
|
|
"""Get remaining cooldown time in seconds"""
|
|
if self.cooldown_seconds <= 0:
|
|
return 0
|
|
|
|
import time
|
|
current_time = time.time()
|
|
elapsed = current_time - self._last_execution_time
|
|
remaining = self.cooldown_seconds - elapsed
|
|
return max(0, int(remaining))
|
|
|
|
def _load_translated_keywords(self):
|
|
"""Load translated keywords from translation files"""
|
|
if not hasattr(self.bot, 'translator'):
|
|
self.logger.debug(f"Translator not available for {self.name}, skipping keyword loading")
|
|
return
|
|
|
|
try:
|
|
# Get translated keywords for this command
|
|
key = f"keywords.{self.name}"
|
|
translated_keywords = self.bot.translator.get_value(key)
|
|
|
|
if translated_keywords and isinstance(translated_keywords, list):
|
|
# Merge translated keywords with original keywords (avoid duplicates)
|
|
original_count = len(self.keywords)
|
|
all_keywords = list(self.keywords) # Start with original
|
|
for translated_keyword in translated_keywords:
|
|
if translated_keyword not in all_keywords:
|
|
all_keywords.append(translated_keyword)
|
|
self.keywords = all_keywords
|
|
added_count = len(self.keywords) - original_count
|
|
if added_count > 0:
|
|
self.logger.debug(f"Loaded {added_count} translated keyword(s) for {self.name}: {self.keywords}")
|
|
else:
|
|
self.logger.debug(f"No translated keywords found for {self.name} (key: {key})")
|
|
except Exception as e:
|
|
# Log the error for debugging
|
|
self.logger.debug(f"Could not load translated keywords for {self.name}: {e}")
|
|
|
|
def matches_keyword(self, message: MeshMessage) -> bool:
|
|
"""Check if this command matches the message content based on keywords"""
|
|
if not self.keywords:
|
|
return False
|
|
|
|
# Strip exclamation mark if present (for command-style messages)
|
|
content = message.content.strip()
|
|
if content.startswith('!'):
|
|
content = content[1:].strip()
|
|
content_lower = content.lower()
|
|
|
|
for keyword in self.keywords:
|
|
keyword_lower = keyword.lower()
|
|
|
|
# Check for exact match first
|
|
if keyword_lower == content_lower:
|
|
return True
|
|
|
|
# Check if the message starts with the keyword (followed by space or end of string)
|
|
# This ensures the keyword is the first word in the message
|
|
if content_lower.startswith(keyword_lower):
|
|
# Check if it's followed by a space or is the end of the message
|
|
if len(content_lower) == len(keyword_lower) or content_lower[len(keyword_lower)] == ' ':
|
|
return True
|
|
|
|
return False
|
|
|
|
def matches_custom_syntax(self, message: MeshMessage) -> bool:
|
|
"""Check if this command matches custom syntax patterns"""
|
|
# Override in subclasses for custom syntax matching
|
|
return False
|
|
|
|
def should_execute(self, message: MeshMessage) -> bool:
|
|
"""Check if this command should execute for the given message"""
|
|
return (self.matches_keyword(message) or self.matches_custom_syntax(message))
|
|
|
|
def can_execute_now(self, message: MeshMessage) -> bool:
|
|
"""Check if this command can execute right now (permissions, cooldown, etc.)"""
|
|
return self.can_execute(message)
|
|
|
|
def build_enhanced_connection_info(self, message: MeshMessage) -> str:
|
|
"""Build enhanced connection info with SNR, RSSI, and parsed route information"""
|
|
# Extract just the hops and path info without the route type
|
|
routing_info = message.path or "Unknown routing"
|
|
|
|
# Clean up the routing info to remove the "via ROUTE_TYPE_*" part
|
|
if "via ROUTE_TYPE_" in routing_info:
|
|
# Extract just the hops and path part
|
|
parts = routing_info.split(" via ROUTE_TYPE_")
|
|
if len(parts) > 0:
|
|
routing_info = parts[0]
|
|
|
|
# Add SNR and RSSI
|
|
snr_info = f"SNR: {message.snr or 'Unknown'} dB"
|
|
rssi_info = f"RSSI: {message.rssi or 'Unknown'} dBm"
|
|
|
|
# Build enhanced connection info
|
|
connection_info = f"{routing_info} | {snr_info} | {rssi_info}"
|
|
|
|
return connection_info
|
|
|
|
def format_timestamp(self, message: MeshMessage) -> str:
|
|
"""Format current bot time for display (not sender's timestamp to avoid clock issues)"""
|
|
try:
|
|
# Get configured timezone or use system timezone
|
|
timezone_str = self.bot.config.get('Bot', 'timezone', fallback='')
|
|
|
|
if timezone_str:
|
|
try:
|
|
# Use configured timezone
|
|
tz = pytz.timezone(timezone_str)
|
|
dt = datetime.now(tz)
|
|
except pytz.exceptions.UnknownTimeZoneError:
|
|
# Fallback to system timezone if configured timezone is invalid
|
|
dt = datetime.now()
|
|
else:
|
|
# Use system timezone
|
|
dt = datetime.now()
|
|
|
|
return dt.strftime("%H:%M:%S")
|
|
except:
|
|
return "Unknown"
|
|
|
|
def format_response(self, message: MeshMessage, response_format: str) -> str:
|
|
"""Format a response string with message data"""
|
|
try:
|
|
connection_info = self.build_enhanced_connection_info(message)
|
|
timestamp = self.format_timestamp(message)
|
|
|
|
return response_format.format(
|
|
sender=message.sender_id or "Unknown",
|
|
connection_info=connection_info,
|
|
path=message.path or "Unknown",
|
|
timestamp=timestamp,
|
|
snr=message.snr or "Unknown",
|
|
rssi=message.rssi or "Unknown"
|
|
)
|
|
except (KeyError, ValueError) as e:
|
|
self.logger.warning(f"Error formatting response: {e}")
|
|
return response_format
|
|
|
|
def get_response_format(self) -> Optional[str]:
|
|
"""Get the response format for this command from config"""
|
|
# Override in subclasses to provide custom response formats
|
|
return None
|
|
|
|
def requires_admin_access(self) -> bool:
|
|
"""Check if this command requires admin access"""
|
|
if not hasattr(self.bot, 'config'):
|
|
return False
|
|
|
|
try:
|
|
# Get list of admin commands from config
|
|
admin_commands = self.bot.config.get('Admin_ACL', 'admin_commands', fallback='')
|
|
if not admin_commands:
|
|
return False
|
|
|
|
# Check if this command name is in the admin commands list
|
|
admin_command_list = [cmd.strip() for cmd in admin_commands.split(',') if cmd.strip()]
|
|
return self.name in admin_command_list
|
|
except Exception as e:
|
|
self.logger.warning(f"Error checking admin access requirement: {e}")
|
|
return False
|
|
|
|
def _check_admin_access(self, message: MeshMessage) -> bool:
|
|
"""Check if the message sender has admin access"""
|
|
if not hasattr(self.bot, 'config'):
|
|
return False
|
|
|
|
try:
|
|
# Get admin pubkeys from config
|
|
admin_pubkeys = self.bot.config.get('Admin_ACL', 'admin_pubkeys', fallback='')
|
|
if not admin_pubkeys:
|
|
self.logger.warning("No admin pubkeys configured")
|
|
return False
|
|
|
|
# Parse admin pubkeys
|
|
admin_pubkey_list = [key.strip() for key in admin_pubkeys.split(',') if key.strip()]
|
|
if not admin_pubkey_list:
|
|
self.logger.warning("No valid admin pubkeys found in config")
|
|
return False
|
|
|
|
# Get sender's public key from message
|
|
sender_pubkey = getattr(message, 'sender_pubkey', None)
|
|
if not sender_pubkey:
|
|
# Try to get from sender_id if it's a pubkey
|
|
sender_pubkey = getattr(message, 'sender_id', None)
|
|
|
|
if not sender_pubkey:
|
|
self.logger.warning(f"No sender public key found for message from {message.sender_id}")
|
|
return False
|
|
|
|
# Check if sender's pubkey matches any admin key (exact match required for security)
|
|
is_admin = False
|
|
for admin_key in admin_pubkey_list:
|
|
# Only allow exact matches for security
|
|
if sender_pubkey == admin_key:
|
|
is_admin = True
|
|
break
|
|
|
|
if not is_admin:
|
|
self.logger.info(f"Access denied for {message.sender_id} (pubkey: {sender_pubkey[:16]}...) - not in admin ACL")
|
|
else:
|
|
self.logger.info(f"Admin access granted for {message.sender_id} (pubkey: {sender_pubkey[:16]}...)")
|
|
|
|
return is_admin
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error checking admin access: {e}")
|
|
return False
|
|
|
|
def _strip_quotes_from_config(self, value: str) -> str:
|
|
"""Strip quotes from config values if present"""
|
|
if value and value.startswith('"') and value.endswith('"'):
|
|
return value[1:-1]
|
|
return value
|
|
|
|
async def handle_keyword_match(self, message: MeshMessage) -> bool:
|
|
"""Handle keyword matching and response generation"""
|
|
response_format = self.get_response_format()
|
|
if response_format:
|
|
response = self.format_response(message, response_format)
|
|
return await self.send_response(message, response)
|
|
else:
|
|
# No response format configured - don't respond
|
|
# This prevents recursion and allows disabling commands by commenting them out in config
|
|
return False
|