Refactor command handling and enhance plugin validation

- Removed redundant command list in config example for clarity.
- Improved plugin validation by adding checks for required attributes and types, ensuring better error handling during plugin instantiation.
- Enhanced rate limiter classes to track total sends and throttled attempts, providing statistics for better monitoring.
- Updated command implementations to include metadata for better organization and clarity in command handling.
This commit is contained in:
agessaman
2026-01-01 12:03:54 -08:00
parent e5f98d69e6
commit 5ae67bc071
6 changed files with 261 additions and 40 deletions

View File

@@ -233,7 +233,9 @@ test = "ack @[{sender}]{phrase_part} | {connection_info} | Received at: {timesta
ping = "Pong!"
pong = "Ping!"
help = "Bot Help: test (or t), ping, help, hello, cmd, advert, @string, wx, aqi, sun, moon, solar, hfcond, satpass, prefix, path, sports, dice, roll, stats | More: 'help <command>'"
cmd = "Available commands: test (or t), ping, help, hello, cmd, advert, @string, wx, aqi, sun, moon, solar, hfcond, satpass, prefix, path, sports, dice, roll, stats"
Override 'cmd' command output
# cmd = "Available commands: test (or t), ping, help, hello, cmd, advert, @string, wx, aqi, sun, moon, solar, hfcond, satpass, prefix, path, sports, dice, roll, stats"
[Channels]
# Channels to monitor (comma-separated)

View File

@@ -11,12 +11,123 @@ from ..models import MeshMessage
class CmdCommand(BaseCommand):
"""Handles the cmd command"""
# Plugin metadata
name = "cmd"
keywords = ['cmd', 'commands']
description = "Lists available commands in compact format"
category = "basic"
def get_help_text(self) -> str:
return "Lists commands in compact format."
def _get_commands_list(self, max_length: int = None) -> str:
"""Get a compact list of available commands, prioritizing important ones
Args:
max_length: Maximum length for the command list (None = no limit)
Returns:
Comma-separated list of commands, truncated if necessary
"""
# Define priority order - most important/commonly used commands first
priority_commands = [
'test', 'ping', 'help', 'hello', 'cmd', 'advert',
'wx', 'aqi', 'sun', 'moon', 'solar', 'hfcond', 'satpass',
'prefix', 'path', 'sports', 'dice', 'roll', 'stats'
]
# Get all command names
all_commands = []
# Include plugin commands
for cmd_name, cmd_instance in self.bot.command_manager.commands.items():
# Skip system commands without keywords (like greeter)
if hasattr(cmd_instance, 'keywords') and cmd_instance.keywords:
all_commands.append(cmd_name)
# Include config keywords that aren't handled by plugins
for keyword in self.bot.command_manager.keywords.keys():
# Check if this keyword is already handled by a plugin
is_plugin_keyword = any(
keyword.lower() in [k.lower() for k in cmd.keywords]
for cmd in self.bot.command_manager.commands.values()
)
if not is_plugin_keyword:
all_commands.append(keyword)
# Remove duplicates and sort
all_commands = sorted(set(all_commands))
# Prioritize: put priority commands first, then others
prioritized = []
remaining = []
for cmd in all_commands:
if cmd in priority_commands:
prioritized.append(cmd)
else:
remaining.append(cmd)
# Sort priority commands by their order in priority_commands list
prioritized = sorted(prioritized, key=lambda x: priority_commands.index(x) if x in priority_commands else 999)
# Combine: priority first, then others
command_names = prioritized + sorted(remaining)
# Build the list, respecting max_length if provided
if max_length is None:
return ', '.join(command_names)
# Build list within length limit
result = []
prefix = "Available commands: "
current_length = len(prefix)
for cmd in command_names:
# Calculate length if we add this command: ", cmd" or "cmd" (first one)
if result:
test_length = current_length + len(', ') + len(cmd)
else:
test_length = current_length + len(cmd)
if test_length <= max_length:
if result:
result.append(cmd)
current_length += len(', ') + len(cmd)
else:
result.append(cmd)
current_length += len(cmd)
else:
# Can't fit this command, add count of remaining
remaining_count = len(command_names) - len(result)
if remaining_count > 0:
suffix = f" (+{remaining_count} more)"
if current_length + len(suffix) <= max_length:
result.append(suffix.replace('+', '').replace('more', f'{remaining_count} more'))
break
return prefix + ', '.join(result)
async def execute(self, message: MeshMessage) -> bool:
"""Execute the cmd command"""
# The cmd command is handled by keyword matching in the command manager
# This is just a placeholder for future functionality
self.logger.debug("Cmd command executed (handled by keyword matching)")
return True
try:
# Check if user has defined a custom cmd keyword response in config
# Use the already-loaded keywords dict (quotes are already stripped)
cmd_keyword = self.bot.command_manager.keywords.get('cmd')
if cmd_keyword:
# User has defined a custom response, use it with formatting
response = self.bot.command_manager.format_keyword_response(cmd_keyword, message)
return await self.send_response(message, response)
# Fallback to dynamic command list if no custom keyword is defined
# Get max message length to ensure we fit within limits
max_length = self.get_max_message_length(message)
# Reserve space for "Available commands: " prefix
available_length = max_length - len("Available commands: ")
commands_list = self._get_commands_list(max_length=available_length)
response = f"Available commands: {commands_list}"
return await self.send_response(message, response)
except Exception as e:
self.logger.error(f"Error executing cmd command: {e}")
error_msg = self.translate('errors.execution_error', command='cmd', error=str(e))
return await self.send_response(message, error_msg)

View File

@@ -11,9 +11,14 @@ from ..models import MeshMessage
class HfcondCommand(BaseCommand):
"""Command to get HF band conditions"""
# Plugin metadata
name = "hfcond"
keywords = ['hfcond']
description = "Get HF band conditions for ham radio"
category = "solar"
def __init__(self, bot):
super().__init__(bot)
self.keywords = ['hfcond']
async def execute(self, message: MeshMessage) -> bool:
"""Execute the hfcond command"""

View File

@@ -11,9 +11,14 @@ from ..models import MeshMessage
class SunCommand(BaseCommand):
"""Command to get sun information"""
# Plugin metadata
name = "sun"
keywords = ['sun']
description = "Get sunrise/sunset times"
category = "solar"
def __init__(self, bot):
super().__init__(bot)
self.keywords = ['sun']
async def execute(self, message: MeshMessage) -> bool:
"""Execute the sun command"""

View File

@@ -87,7 +87,9 @@ class PluginLoader:
def _validate_plugin(self, plugin_class: Type[BaseCommand]) -> List[str]:
"""
Validate a plugin class has required attributes.
Validate a plugin class has required attributes before instantiation.
All plugins should define name and keywords as class attributes (the standard convention).
We validate leniently here and re-check after instantiation to catch any edge cases.
Args:
plugin_class: The plugin class to validate
@@ -97,28 +99,51 @@ class PluginLoader:
"""
errors = []
required_attrs = ['name', 'keywords', 'description', 'execute']
for attr in required_attrs:
if not hasattr(plugin_class, attr):
errors.append(f"Missing required attribute: {attr}")
if hasattr(plugin_class, 'name'):
# Check if name is a class attribute (not just instance attribute)
name_value = getattr(plugin_class, 'name', None)
if not name_value:
errors.append("Plugin 'name' attribute is empty")
if hasattr(plugin_class, 'keywords'):
keywords = getattr(plugin_class, 'keywords', None)
if not isinstance(keywords, list):
errors.append("Plugin 'keywords' must be a list")
elif not keywords:
errors.append("Plugin 'keywords' list is empty")
if hasattr(plugin_class, 'execute'):
# Check for execute method (must exist as class method)
if not hasattr(plugin_class, 'execute'):
errors.append(f"Missing required attribute: execute")
elif hasattr(plugin_class, 'execute'):
if not inspect.iscoroutinefunction(plugin_class.execute):
errors.append("Plugin 'execute' method must be async")
# Check for keywords type if it exists (allow empty list for system commands)
if hasattr(plugin_class, 'keywords'):
keywords = getattr(plugin_class, 'keywords', None)
if keywords is not None and not isinstance(keywords, list):
errors.append("Plugin 'keywords' must be a list")
# Don't error on empty keywords - some commands (like greeter) intentionally have none
# Note: We don't check name/keywords presence here since they may be set in __init__
# or derived from class name. Post-instantiation validation will catch missing attributes.
return errors
def _validate_plugin_instance(self, plugin_instance: BaseCommand, plugin_name: str) -> List[str]:
"""
Validate a plugin instance after instantiation.
Ensures required attributes are present and correctly typed.
Note: Name may be derived from class name if not explicitly set.
Args:
plugin_instance: The instantiated plugin
plugin_name: The file name of the plugin (for error messages)
Returns:
List of validation errors (empty if valid)
"""
errors = []
# Check name - must be set (either as class attribute, in __init__, or derived from class name)
if not hasattr(plugin_instance, 'name') or not plugin_instance.name:
errors.append("Plugin 'name' attribute is empty or not set")
# Check keywords - must be a list (can be empty for system commands like greeter)
if not hasattr(plugin_instance, 'keywords'):
errors.append("Plugin 'keywords' attribute is missing")
elif not isinstance(plugin_instance.keywords, list):
errors.append("Plugin 'keywords' must be a list")
# Allow empty keywords - some system commands intentionally have none
return errors
def load_plugin(self, plugin_name: str, from_alternatives: bool = False) -> Optional[BaseCommand]:
@@ -157,7 +182,7 @@ class PluginLoader:
self._failed_plugins[plugin_name] = error_msg
return None
# Validate plugin class before instantiation
# Validate plugin class before instantiation (basic checks)
validation_errors = self._validate_plugin(command_class)
if validation_errors:
error_msg = f"Plugin validation failed: {', '.join(validation_errors)}"
@@ -168,12 +193,22 @@ class PluginLoader:
# Instantiate the command
plugin_instance = command_class(self.bot)
# Set name from class name if not set (before validation)
if not hasattr(plugin_instance, 'name') or not plugin_instance.name:
# Use the class name as the plugin name if not specified
derived_name = command_class.__name__.lower().replace('command', '')
plugin_instance.name = derived_name
# Validate plugin instance after instantiation (catches attributes set in __init__)
instance_validation_errors = self._validate_plugin_instance(plugin_instance, plugin_name)
if instance_validation_errors:
error_msg = f"Plugin instance validation failed: {', '.join(instance_validation_errors)}"
self.logger.error(f"Failed to load plugin '{plugin_name}': {error_msg}")
self._failed_plugins[plugin_name] = error_msg
return None
# Validate plugin metadata
metadata = plugin_instance.get_metadata()
if not metadata.get('name'):
# Use the class name as the plugin name if not specified
metadata['name'] = command_class.__name__.lower().replace('command', '')
plugin_instance.name = metadata['name']
source = "alternatives" if from_alternatives else "default"
self.logger.info(f"Successfully loaded plugin: {metadata['name']} from {plugin_name} ({source})")

View File

@@ -5,6 +5,8 @@ Controls how often messages can be sent to prevent spam
"""
import time
import asyncio
from typing import Optional
class RateLimiter:
@@ -13,10 +15,15 @@ class RateLimiter:
def __init__(self, seconds: int):
self.seconds = seconds
self.last_send = 0
self._total_sends = 0
self._total_throttled = 0
def can_send(self) -> bool:
"""Check if we can send a message"""
return time.time() - self.last_send >= self.seconds
can = time.time() - self.last_send >= self.seconds
if not can:
self._total_throttled += 1
return can
def time_until_next(self) -> float:
"""Get time until next allowed send"""
@@ -26,6 +33,17 @@ class RateLimiter:
def record_send(self):
"""Record that we sent a message"""
self.last_send = time.time()
self._total_sends += 1
def get_stats(self) -> dict:
"""Get rate limiter statistics"""
total_attempts = self._total_sends + self._total_throttled
throttle_rate = self._total_throttled / max(1, total_attempts)
return {
'total_sends': self._total_sends,
'total_throttled': self._total_throttled,
'throttle_rate': throttle_rate
}
class BotTxRateLimiter:
@@ -34,10 +52,15 @@ class BotTxRateLimiter:
def __init__(self, seconds: float = 1.0):
self.seconds = seconds
self.last_tx = 0
self._total_tx = 0
self._total_throttled = 0
def can_tx(self) -> bool:
"""Check if bot can transmit a message"""
return time.time() - self.last_tx >= self.seconds
can = time.time() - self.last_tx >= self.seconds
if not can:
self._total_throttled += 1
return can
def time_until_next_tx(self) -> float:
"""Get time until next allowed transmission"""
@@ -47,14 +70,24 @@ class BotTxRateLimiter:
def record_tx(self):
"""Record that bot transmitted a message"""
self.last_tx = time.time()
self._total_tx += 1
async def wait_for_tx(self):
"""Wait until bot can transmit (async)"""
import asyncio
while not self.can_tx():
wait_time = self.time_until_next_tx()
if wait_time > 0:
await asyncio.sleep(wait_time + 0.05) # Small buffer
def get_stats(self) -> dict:
"""Get rate limiter statistics"""
total_attempts = self._total_tx + self._total_throttled
throttle_rate = self._total_throttled / max(1, total_attempts)
return {
'total_tx': self._total_tx,
'total_throttled': self._total_throttled,
'throttle_rate': throttle_rate
}
class NominatimRateLimiter:
@@ -67,11 +100,22 @@ class NominatimRateLimiter:
def __init__(self, seconds: float = 1.1):
self.seconds = seconds
self.last_request = 0
self._lock = None # Will be set to asyncio.Lock if needed
self._lock: Optional[asyncio.Lock] = None
self._total_requests = 0
self._total_throttled = 0
def _get_lock(self) -> asyncio.Lock:
"""Lazily initialize the async lock"""
if self._lock is None:
self._lock = asyncio.Lock()
return self._lock
def can_request(self) -> bool:
"""Check if we can make a Nominatim request"""
return time.time() - self.last_request >= self.seconds
can = time.time() - self.last_request >= self.seconds
if not can:
self._total_throttled += 1
return can
def time_until_next(self) -> float:
"""Get time until next allowed request"""
@@ -81,19 +125,38 @@ class NominatimRateLimiter:
def record_request(self):
"""Record that we made a Nominatim request"""
self.last_request = time.time()
self._total_requests += 1
async def wait_for_request(self):
"""Wait until we can make a Nominatim request (async)"""
import asyncio
while not self.can_request():
wait_time = self.time_until_next()
if wait_time > 0:
await asyncio.sleep(wait_time + 0.05) # Small buffer
async def wait_and_request(self) -> None:
"""Wait until a request can be made, then mark request time (thread-safe)"""
async with self._get_lock():
current_time = time.time()
time_since_last = current_time - self.last_request
if time_since_last < self.seconds:
await asyncio.sleep(self.seconds - time_since_last)
self.last_request = time.time()
self._total_requests += 1
def wait_for_request_sync(self):
"""Wait until we can make a Nominatim request (synchronous)"""
import time as time_module
while not self.can_request():
wait_time = self.time_until_next()
if wait_time > 0:
time_module.sleep(wait_time + 0.05) # Small buffer
time.sleep(wait_time + 0.05) # Small buffer
def get_stats(self) -> dict:
"""Get rate limiter statistics"""
total_attempts = self._total_requests + self._total_throttled
throttle_rate = self._total_throttled / max(1, total_attempts)
return {
'total_requests': self._total_requests,
'total_throttled': self._total_throttled,
'throttle_rate': throttle_rate
}