mirror of
https://github.com/agessaman/meshcore-bot.git
synced 2026-03-30 20:15:40 +00:00
513 lines
21 KiB
Python
513 lines
21 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Base command class for all MeshCore Bot commands
|
|
Provides common functionality and interface for command implementations
|
|
"""
|
|
|
|
from abc import ABC, abstractmethod
|
|
from typing import Optional, List, Dict, Any, Tuple
|
|
from datetime import datetime
|
|
import pytz
|
|
import re
|
|
from ..models import MeshMessage
|
|
|
|
|
|
class BaseCommand(ABC):
|
|
"""Base class for all bot commands - Plugin Interface"""
|
|
|
|
# Plugin metadata - to be overridden by subclasses
|
|
name: str = ""
|
|
keywords: List[str] = [] # All trigger words for this command (including name and aliases)
|
|
description: str = ""
|
|
requires_dm: bool = False
|
|
cooldown_seconds: int = 0
|
|
category: str = "general"
|
|
|
|
def __init__(self, bot):
|
|
self.bot = bot
|
|
self.logger = bot.logger
|
|
self._last_execution_time = 0
|
|
|
|
# Load allowed channels from config (standardized channel override)
|
|
self.allowed_channels = self._load_allowed_channels()
|
|
|
|
# Load translated keywords after initialization
|
|
self._load_translated_keywords()
|
|
|
|
def translate(self, key: str, **kwargs) -> str:
|
|
"""
|
|
Translate a key using the bot's translator
|
|
|
|
Args:
|
|
key: Dot-separated key path (e.g., 'commands.wx.usage')
|
|
**kwargs: Formatting parameters for string.format()
|
|
|
|
Returns:
|
|
Translated string, or key if translation not found
|
|
"""
|
|
if hasattr(self.bot, 'translator'):
|
|
return self.bot.translator.translate(key, **kwargs)
|
|
# Fallback if translator not available
|
|
return key
|
|
|
|
def translate_get_value(self, key: str) -> Any:
|
|
"""
|
|
Get a raw value from translations (can be string, list, dict, etc.)
|
|
|
|
Args:
|
|
key: Dot-separated key path (e.g., 'commands.hacker.sudo_errors')
|
|
|
|
Returns:
|
|
The value at the key path, or None if not found
|
|
"""
|
|
if hasattr(self.bot, 'translator'):
|
|
return self.bot.translator.get_value(key)
|
|
return None
|
|
|
|
def get_config_value(self, section: str, key: str, fallback=None, value_type: str = 'str'):
|
|
"""
|
|
Get config value with backward compatibility for section name changes.
|
|
|
|
For command configs, checks both old format (e.g., 'Hacker') and new format (e.g., 'Hacker_Command').
|
|
This allows smooth migration from old config format to new standardized format.
|
|
|
|
Args:
|
|
section: Config section name (new format preferred)
|
|
key: Config key name
|
|
fallback: Default value if not found
|
|
value_type: Type of value ('str', 'bool', 'int', 'float')
|
|
|
|
Returns:
|
|
Config value of appropriate type, or fallback if not found
|
|
"""
|
|
# Map of old section names to new standardized names
|
|
section_migration = {
|
|
'Hacker': 'Hacker_Command',
|
|
'Sports': 'Sports_Command',
|
|
'Stats': 'Stats_Command',
|
|
}
|
|
|
|
# Determine old and new section names
|
|
new_section = section
|
|
old_section = None
|
|
for old, new in section_migration.items():
|
|
if new == section:
|
|
old_section = old
|
|
break
|
|
|
|
# Try new section first, then old section for backward compatibility
|
|
sections_to_try = [new_section]
|
|
if old_section:
|
|
sections_to_try.append(old_section)
|
|
|
|
for sec in sections_to_try:
|
|
if self.bot.config.has_section(sec):
|
|
try:
|
|
if value_type == 'bool':
|
|
value = self.bot.config.getboolean(sec, key, fallback=fallback)
|
|
elif value_type == 'int':
|
|
value = self.bot.config.getint(sec, key, fallback=fallback)
|
|
elif value_type == 'float':
|
|
value = self.bot.config.getfloat(sec, key, fallback=fallback)
|
|
else:
|
|
value = self.bot.config.get(sec, key, fallback=fallback)
|
|
|
|
# If we got a value (not fallback), return it
|
|
if value != fallback or self.bot.config.has_option(sec, key):
|
|
# Log migration notice on first use of old section
|
|
if sec == old_section:
|
|
self.logger.info(f"Config migration: Using old section '[{old_section}]' for '{key}'. "
|
|
f"Please update to '[{new_section}]' in config.ini")
|
|
return value
|
|
except Exception as e:
|
|
self.logger.debug(f"Error reading config {sec}.{key}: {e}")
|
|
continue
|
|
|
|
return fallback
|
|
|
|
@abstractmethod
|
|
async def execute(self, message: MeshMessage) -> bool:
|
|
"""Execute the command with the given message"""
|
|
pass
|
|
|
|
def get_help_text(self) -> str:
|
|
"""Get help text for this command"""
|
|
return self.description or "No help available for this command."
|
|
|
|
def _load_allowed_channels(self) -> Optional[List[str]]:
|
|
"""
|
|
Load allowed channels from config.
|
|
|
|
Config format: [CommandName_Command]
|
|
channels = channel1,channel2,channel3
|
|
|
|
Returns:
|
|
- None: Use global monitor_channels (default behavior)
|
|
- Empty list []: Command disabled for all channels (only DMs)
|
|
- List of channels: Command only works in these channels
|
|
"""
|
|
# Derive section name from command name
|
|
# Convert "sports" -> "Sports_Command", "greeter" -> "Greeter_Command", etc.
|
|
section_name = f"{self.name.title().replace('_', '_')}_Command"
|
|
|
|
# Try to get channels config
|
|
channels_str = self.get_config_value(section_name, 'channels', fallback=None, value_type='str')
|
|
|
|
if channels_str is None:
|
|
return None # Use global monitor_channels
|
|
|
|
if channels_str.strip() == '':
|
|
return [] # Disabled for all channels (DM only)
|
|
|
|
# Parse comma-separated list
|
|
channels = [ch.strip() for ch in channels_str.split(',') if ch.strip()]
|
|
return channels if channels else None
|
|
|
|
def is_channel_allowed(self, message: MeshMessage) -> bool:
|
|
"""
|
|
Check if this command is allowed in the message's channel.
|
|
|
|
Returns:
|
|
- True if DM and command allows DMs (unless requires_dm is False, but that's separate)
|
|
- True if channel is in allowed_channels (or None for global)
|
|
- False otherwise
|
|
"""
|
|
# DMs are always allowed (unless requires_dm is False, but that's checked separately)
|
|
if message.is_dm:
|
|
return True
|
|
|
|
# If no channel override, use global monitor_channels
|
|
if self.allowed_channels is None:
|
|
return message.channel in self.bot.command_manager.monitor_channels
|
|
|
|
# If empty list, command is disabled for channels (DM only)
|
|
if self.allowed_channels == []:
|
|
return False
|
|
|
|
# Check if channel is in allowed list
|
|
return message.channel in self.allowed_channels
|
|
|
|
def can_execute(self, message: MeshMessage) -> bool:
|
|
"""Check if this command can be executed with the given message"""
|
|
# Check channel access (standardized channel override)
|
|
if not self.is_channel_allowed(message):
|
|
return False
|
|
|
|
# Check if command requires DM and message is not DM
|
|
if self.requires_dm and not message.is_dm:
|
|
return False
|
|
|
|
# Check cooldown
|
|
if self.cooldown_seconds > 0:
|
|
import time
|
|
current_time = time.time()
|
|
if (current_time - self._last_execution_time) < self.cooldown_seconds:
|
|
return False
|
|
|
|
# Check admin ACL if this command requires admin access
|
|
if self.requires_admin_access():
|
|
if not self._check_admin_access(message):
|
|
return False
|
|
|
|
return True
|
|
|
|
def get_metadata(self) -> Dict[str, Any]:
|
|
"""Get plugin metadata for discovery and registration"""
|
|
return {
|
|
'name': self.name,
|
|
'keywords': self.keywords,
|
|
'description': self.description,
|
|
'requires_dm': self.requires_dm,
|
|
'cooldown_seconds': self.cooldown_seconds,
|
|
'category': self.category,
|
|
'class_name': self.__class__.__name__,
|
|
'module_name': self.__class__.__module__
|
|
}
|
|
|
|
async def send_response(self, message: MeshMessage, content: str) -> bool:
|
|
"""Unified method for sending responses to users"""
|
|
try:
|
|
# Use the command manager's send_response method to ensure response capture
|
|
return await self.bot.command_manager.send_response(message, content)
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to send response: {e}")
|
|
return False
|
|
|
|
def _record_execution(self):
|
|
"""Record the execution time for cooldown tracking"""
|
|
import time
|
|
self._last_execution_time = time.time()
|
|
|
|
def get_remaining_cooldown(self) -> int:
|
|
"""Get remaining cooldown time in seconds"""
|
|
if self.cooldown_seconds <= 0:
|
|
return 0
|
|
|
|
import time
|
|
current_time = time.time()
|
|
elapsed = current_time - self._last_execution_time
|
|
remaining = self.cooldown_seconds - elapsed
|
|
return max(0, int(remaining))
|
|
|
|
def _load_translated_keywords(self):
|
|
"""Load translated keywords from translation files"""
|
|
if not hasattr(self.bot, 'translator'):
|
|
self.logger.debug(f"Translator not available for {self.name}, skipping keyword loading")
|
|
return
|
|
|
|
try:
|
|
# Get translated keywords for this command
|
|
key = f"keywords.{self.name}"
|
|
translated_keywords = self.bot.translator.get_value(key)
|
|
|
|
if translated_keywords and isinstance(translated_keywords, list):
|
|
# Merge translated keywords with original keywords (avoid duplicates)
|
|
original_count = len(self.keywords)
|
|
all_keywords = list(self.keywords) # Start with original
|
|
for translated_keyword in translated_keywords:
|
|
if translated_keyword not in all_keywords:
|
|
all_keywords.append(translated_keyword)
|
|
self.keywords = all_keywords
|
|
added_count = len(self.keywords) - original_count
|
|
if added_count > 0:
|
|
self.logger.debug(f"Loaded {added_count} translated keyword(s) for {self.name}: {self.keywords}")
|
|
else:
|
|
self.logger.debug(f"No translated keywords found for {self.name} (key: {key})")
|
|
except Exception as e:
|
|
# Log the error for debugging
|
|
self.logger.debug(f"Could not load translated keywords for {self.name}: {e}")
|
|
|
|
def matches_keyword(self, message: MeshMessage) -> bool:
|
|
"""Check if this command matches the message content based on keywords"""
|
|
if not self.keywords:
|
|
return False
|
|
|
|
# Strip exclamation mark if present (for command-style messages)
|
|
content = message.content.strip()
|
|
if content.startswith('!'):
|
|
content = content[1:].strip()
|
|
content_lower = content.lower()
|
|
|
|
for keyword in self.keywords:
|
|
keyword_lower = keyword.lower()
|
|
|
|
# Check for exact match first
|
|
if keyword_lower == content_lower:
|
|
return True
|
|
|
|
# Check if the message starts with the keyword (followed by space or end of string)
|
|
# This ensures the keyword is the first word in the message
|
|
if content_lower.startswith(keyword_lower):
|
|
# Check if it's followed by a space or is the end of the message
|
|
if len(content_lower) == len(keyword_lower) or content_lower[len(keyword_lower)] == ' ':
|
|
return True
|
|
|
|
return False
|
|
|
|
def matches_custom_syntax(self, message: MeshMessage) -> bool:
|
|
"""Check if this command matches custom syntax patterns"""
|
|
# Override in subclasses for custom syntax matching
|
|
return False
|
|
|
|
def should_execute(self, message: MeshMessage) -> bool:
|
|
"""Check if this command should execute for the given message"""
|
|
# First check if keyword matches
|
|
if not (self.matches_keyword(message) or self.matches_custom_syntax(message)):
|
|
return False
|
|
|
|
# For DM-only commands, only consider them if:
|
|
# 1. Message is a DM, OR
|
|
# 2. Channel is in allowed_channels (or monitor_channels if no override)
|
|
# This prevents DM-only commands from being processed in public channels
|
|
if self.requires_dm and not message.is_dm:
|
|
# Check if channel is allowed for this command
|
|
if not self.is_channel_allowed(message):
|
|
# Channel not allowed - don't even consider this command
|
|
return False
|
|
|
|
return True
|
|
|
|
def can_execute_now(self, message: MeshMessage) -> bool:
|
|
"""Check if this command can execute right now (permissions, cooldown, etc.)"""
|
|
return self.can_execute(message)
|
|
|
|
def build_enhanced_connection_info(self, message: MeshMessage) -> str:
|
|
"""Build enhanced connection info with SNR, RSSI, and parsed route information"""
|
|
# Extract just the hops and path info without the route type
|
|
routing_info = message.path or "Unknown routing"
|
|
|
|
# Clean up the routing info to remove the "via ROUTE_TYPE_*" part
|
|
if "via ROUTE_TYPE_" in routing_info:
|
|
# Extract just the hops and path part
|
|
parts = routing_info.split(" via ROUTE_TYPE_")
|
|
if len(parts) > 0:
|
|
routing_info = parts[0]
|
|
|
|
# Add SNR and RSSI
|
|
snr_info = f"SNR: {message.snr or 'Unknown'} dB"
|
|
rssi_info = f"RSSI: {message.rssi or 'Unknown'} dBm"
|
|
|
|
# Build enhanced connection info
|
|
connection_info = f"{routing_info} | {snr_info} | {rssi_info}"
|
|
|
|
return connection_info
|
|
|
|
def format_timestamp(self, message: MeshMessage) -> str:
|
|
"""Format current bot time for display (not sender's timestamp to avoid clock issues)"""
|
|
try:
|
|
# Get configured timezone or use system timezone
|
|
timezone_str = self.bot.config.get('Bot', 'timezone', fallback='')
|
|
|
|
if timezone_str:
|
|
try:
|
|
# Use configured timezone
|
|
tz = pytz.timezone(timezone_str)
|
|
dt = datetime.now(tz)
|
|
except pytz.exceptions.UnknownTimeZoneError:
|
|
# Fallback to system timezone if configured timezone is invalid
|
|
dt = datetime.now()
|
|
else:
|
|
# Use system timezone
|
|
dt = datetime.now()
|
|
|
|
return dt.strftime("%H:%M:%S")
|
|
except:
|
|
return "Unknown"
|
|
|
|
def format_response(self, message: MeshMessage, response_format: str) -> str:
|
|
"""Format a response string with message data"""
|
|
try:
|
|
connection_info = self.build_enhanced_connection_info(message)
|
|
timestamp = self.format_timestamp(message)
|
|
|
|
return response_format.format(
|
|
sender=message.sender_id or "Unknown",
|
|
connection_info=connection_info,
|
|
path=message.path or "Unknown",
|
|
timestamp=timestamp,
|
|
snr=message.snr or "Unknown",
|
|
rssi=message.rssi or "Unknown"
|
|
)
|
|
except (KeyError, ValueError) as e:
|
|
self.logger.warning(f"Error formatting response: {e}")
|
|
return response_format
|
|
|
|
def get_response_format(self) -> Optional[str]:
|
|
"""Get the response format for this command from config"""
|
|
# Override in subclasses to provide custom response formats
|
|
return None
|
|
|
|
def requires_admin_access(self) -> bool:
|
|
"""Check if this command requires admin access"""
|
|
if not hasattr(self.bot, 'config'):
|
|
return False
|
|
|
|
try:
|
|
# Get list of admin commands from config
|
|
admin_commands = self.bot.config.get('Admin_ACL', 'admin_commands', fallback='')
|
|
if not admin_commands:
|
|
return False
|
|
|
|
# Check if this command name is in the admin commands list
|
|
admin_command_list = [cmd.strip() for cmd in admin_commands.split(',') if cmd.strip()]
|
|
return self.name in admin_command_list
|
|
except Exception as e:
|
|
self.logger.warning(f"Error checking admin access requirement: {e}")
|
|
return False
|
|
|
|
def _check_admin_access(self, message: MeshMessage) -> bool:
|
|
"""
|
|
Check if the message sender has admin access (security-hardened)
|
|
|
|
Security features:
|
|
- Strict pubkey format validation (64-char hex)
|
|
- No fallback to sender_id (prevents spoofing)
|
|
- Whitespace/empty config detection
|
|
- Normalized comparison (lowercase)
|
|
"""
|
|
import re # This import is needed for re.match
|
|
if not hasattr(self.bot, 'config'):
|
|
return False
|
|
|
|
try:
|
|
# Get admin pubkeys from config
|
|
admin_pubkeys = self.bot.config.get('Admin_ACL', 'admin_pubkeys', fallback='')
|
|
|
|
# Check for empty or whitespace-only configuration
|
|
if not admin_pubkeys.strip():
|
|
self.logger.warning("No admin pubkeys configured or empty/whitespace config")
|
|
return False
|
|
|
|
# Parse and VALIDATE admin pubkeys
|
|
admin_pubkey_list = []
|
|
for key in admin_pubkeys.split(','):
|
|
key = key.strip()
|
|
if not key:
|
|
continue
|
|
|
|
# Validate hex format (64 chars for ed25519 public keys)
|
|
if not re.match(r'^[0-9a-fA-F]{64}$', key):
|
|
self.logger.error(f"Invalid admin pubkey format in config: {key[:16]}...")
|
|
continue # Skip invalid keys but continue checking others
|
|
|
|
admin_pubkey_list.append(key.lower()) # Normalize to lowercase
|
|
|
|
if not admin_pubkey_list:
|
|
self.logger.error("No valid admin pubkeys found in config after validation")
|
|
return False
|
|
|
|
# Get sender's public key - NEVER fall back to sender_id
|
|
sender_pubkey = getattr(message, 'sender_pubkey', None)
|
|
if not sender_pubkey:
|
|
self.logger.warning(
|
|
f"No sender public key available for {message.sender_id} - "
|
|
"admin access denied (missing pubkey)"
|
|
)
|
|
return False
|
|
|
|
# Validate sender pubkey format
|
|
if not re.match(r'^[0-9a-fA-F]{64}$', sender_pubkey):
|
|
self.logger.warning(
|
|
f"Invalid sender pubkey format from {message.sender_id}: "
|
|
f"{sender_pubkey[:16]}... - admin access denied"
|
|
)
|
|
return False
|
|
|
|
# Normalize and compare
|
|
sender_pubkey_normalized = sender_pubkey.lower()
|
|
is_admin = sender_pubkey_normalized in admin_pubkey_list
|
|
|
|
if not is_admin:
|
|
self.logger.warning(
|
|
f"Access denied for {message.sender_id} "
|
|
f"(pubkey: {sender_pubkey[:16]}...) - not in admin ACL"
|
|
)
|
|
else:
|
|
self.logger.info(
|
|
f"Admin access granted for {message.sender_id} "
|
|
f"(pubkey: {sender_pubkey[:16]}...)"
|
|
)
|
|
|
|
return is_admin
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error checking admin access: {e}")
|
|
return False # Fail securely
|
|
|
|
def _strip_quotes_from_config(self, value: str) -> str:
|
|
"""Strip quotes from config values if present"""
|
|
if value and value.startswith('"') and value.endswith('"'):
|
|
return value[1:-1]
|
|
return value
|
|
|
|
async def handle_keyword_match(self, message: MeshMessage) -> bool:
|
|
"""Handle keyword matching and response generation"""
|
|
response_format = self.get_response_format()
|
|
if response_format:
|
|
response = self.format_response(message, response_format)
|
|
return await self.send_response(message, response)
|
|
else:
|
|
# No response format configured - don't respond
|
|
# This prevents recursion and allows disabling commands by commenting them out in config
|
|
return False
|