Files
meshcore-bot/modules/plugin_loader.py
agessaman 4ccfe86494 Enhance weather command functionality with provider selection and delegation
- Added a new configuration option in config.ini.example for selecting the weather provider (NOAA or Open-Meteo), defaulting to NOAA.
- Updated WxCommand to delegate to wx_international when the Open-Meteo provider is selected, improving global weather command support.
- Implemented fallback behavior to use wx_international as the wx command if the default wx command is missing, enhancing user experience.
- Improved logging to provide clearer information on command delegation and provider usage.
2025-12-28 21:45:22 -08:00

374 lines
17 KiB
Python

#!/usr/bin/env python3
"""
Plugin loader for dynamic command discovery and loading
Handles scanning, loading, and registering command plugins
"""
import os
import sys
import importlib
import importlib.util
from pathlib import Path
from typing import Dict, List, Any, Optional, Type
import logging
from .commands.base_command import BaseCommand
class PluginLoader:
"""Handles dynamic loading and discovery of command plugins"""
def __init__(self, bot, commands_dir: str = None):
self.bot = bot
self.logger = bot.logger
self.commands_dir = commands_dir or os.path.join(os.path.dirname(__file__), 'commands')
self.alternatives_dir = os.path.join(self.commands_dir, 'alternatives')
self.loaded_plugins: Dict[str, BaseCommand] = {}
self.plugin_metadata: Dict[str, Dict[str, Any]] = {}
self.keyword_mappings: Dict[str, str] = {} # keyword -> plugin_name
self.plugin_overrides: Dict[str, str] = {} # plugin_name -> alternative_file_name
self._load_plugin_overrides()
def _load_plugin_overrides(self):
"""Load plugin override configuration from config file"""
self.plugin_overrides = {}
try:
if self.bot.config.has_section('Plugin_Overrides'):
for command_name, alternative_file in self.bot.config.items('Plugin_Overrides'):
# Remove .py extension if present
if alternative_file.endswith('.py'):
alternative_file = alternative_file[:-3]
self.plugin_overrides[command_name.strip()] = alternative_file.strip()
self.logger.info(f"Plugin override configured: {command_name} -> {alternative_file}")
except Exception as e:
self.logger.warning(f"Error loading plugin overrides: {e}")
def discover_plugins(self) -> List[str]:
"""Discover all Python files in the commands directory that could be plugins"""
plugin_files = []
commands_path = Path(self.commands_dir)
if not commands_path.exists():
self.logger.error(f"Commands directory does not exist: {self.commands_dir}")
return plugin_files
# Scan for Python files (excluding __init__.py and base_command.py)
for file_path in commands_path.glob("*.py"):
if file_path.name not in ["__init__.py", "base_command.py", "plugin_loader.py"]:
plugin_files.append(file_path.stem)
self.logger.info(f"Discovered {len(plugin_files)} potential plugin files: {plugin_files}")
return plugin_files
def discover_alternative_plugins(self) -> List[str]:
"""Discover all Python files in the alternatives directory that could be plugins
Note: Plugins in the 'inactive' subdirectory are ignored
"""
plugin_files = []
alternatives_path = Path(self.alternatives_dir)
if not alternatives_path.exists():
# Alternatives directory doesn't exist yet, that's okay
return plugin_files
# Scan for Python files (excluding __init__.py)
# Note: glob("*.py") only matches files in the current directory, not subdirectories
# So the 'inactive' subdirectory is automatically excluded
for file_path in alternatives_path.glob("*.py"):
if file_path.name not in ["__init__.py"]:
plugin_files.append(file_path.stem)
if plugin_files:
self.logger.info(f"Discovered {len(plugin_files)} alternative plugin files: {plugin_files}")
return plugin_files
def load_plugin(self, plugin_name: str, from_alternatives: bool = False) -> Optional[BaseCommand]:
"""Load a single plugin by name
Args:
plugin_name: Name of the plugin file (without .py extension)
from_alternatives: If True, load from alternatives directory; if False, load from commands directory
"""
try:
# Construct the full module path
if from_alternatives:
module_path = f"modules.commands.alternatives.{plugin_name}"
else:
module_path = f"modules.commands.{plugin_name}"
# Check if module is already loaded
if module_path in sys.modules:
module = sys.modules[module_path]
else:
# Import the module
module = importlib.import_module(module_path)
# Find the command class (should be the only class that inherits from BaseCommand)
command_class = None
for name, obj in inspect.getmembers(module, inspect.isclass):
if (issubclass(obj, BaseCommand) and
obj != BaseCommand and
obj.__module__ == module_path):
command_class = obj
break
if not command_class:
self.logger.warning(f"No valid command class found in {plugin_name}")
return None
# Instantiate the command
plugin_instance = command_class(self.bot)
# Validate plugin metadata
metadata = plugin_instance.get_metadata()
if not metadata.get('name'):
# Use the class name as the plugin name if not specified
metadata['name'] = command_class.__name__.lower().replace('command', '')
plugin_instance.name = metadata['name']
source = "alternatives" if from_alternatives else "default"
self.logger.info(f"Successfully loaded plugin: {metadata['name']} from {plugin_name} ({source})")
return plugin_instance
except Exception as e:
self.logger.error(f"Failed to load plugin {plugin_name}: {e}")
return None
def load_all_plugins(self) -> Dict[str, BaseCommand]:
"""Load all discovered plugins, with alternative plugins taking priority when configured"""
# First, discover all default and alternative plugins
default_plugin_files = self.discover_plugins()
alternative_plugin_files = self.discover_alternative_plugins()
# Build a map of plugin names to their file names for default plugins
default_plugin_map = {} # plugin_name -> file_name
loaded_plugins = {}
# First pass: Load all default plugins and build the map
for plugin_file in default_plugin_files:
plugin_instance = self.load_plugin(plugin_file, from_alternatives=False)
if plugin_instance:
metadata = plugin_instance.get_metadata()
plugin_name = metadata['name']
default_plugin_map[plugin_name] = plugin_file
loaded_plugins[plugin_name] = plugin_instance
self.plugin_metadata[plugin_name] = metadata
# Second pass: Check for overrides and load alternative plugins
# Check config-based overrides first
for plugin_name, alternative_file in self.plugin_overrides.items():
if alternative_file in alternative_plugin_files:
# Load the alternative plugin
alt_instance = self.load_plugin(alternative_file, from_alternatives=True)
if alt_instance:
alt_metadata = alt_instance.get_metadata()
alt_plugin_name = alt_metadata['name']
# If the alternative plugin has a different name, use the override name
# This allows wx_international (name="gwx") to be used as "wx" when configured
if alt_plugin_name != plugin_name:
self.logger.info(
f"Alternative plugin {alternative_file} (name='{alt_plugin_name}') "
f"is being used to override '{plugin_name}'. Registering as '{plugin_name}'."
)
# Update the instance's name and metadata to match the override name
alt_instance.name = plugin_name
alt_metadata['name'] = plugin_name
# Replace the default plugin with the alternative
if plugin_name in loaded_plugins:
self.logger.info(f"Replacing default plugin '{plugin_name}' with alternative '{alternative_file}'")
loaded_plugins[plugin_name] = alt_instance
self.plugin_metadata[plugin_name] = alt_metadata
else:
self.logger.warning(f"Failed to load alternative plugin '{alternative_file}' for '{plugin_name}'")
else:
self.logger.warning(
f"Alternative plugin '{alternative_file}' not found in alternatives directory "
f"for override of '{plugin_name}'"
)
# Third pass: Load alternative plugins that aren't overriding anything
# (standalone alternative plugins)
for alt_file in alternative_plugin_files:
# Skip if this alternative is already loaded as an override
if alt_file in self.plugin_overrides.values():
continue
alt_instance = self.load_plugin(alt_file, from_alternatives=True)
if alt_instance:
alt_metadata = alt_instance.get_metadata()
alt_plugin_name = alt_metadata['name']
# Special case: If wx_international is available and wx command is missing,
# automatically use it as wx (fallback behavior)
if alt_file == 'wx_international' and 'wx' not in loaded_plugins:
self.logger.info(
f"Default 'wx' command not found, using 'wx_international' as fallback for 'wx' command"
)
# Update the instance to use 'wx' name and keywords
alt_instance.name = 'wx'
alt_metadata['name'] = 'wx'
# Update keywords to match wx command
alt_instance.keywords = ['wx', 'weather', 'wxa', 'wxalert']
alt_metadata['keywords'] = ['wx', 'weather', 'wxa', 'wxalert']
alt_metadata['description'] = "Get weather information for any location (usage: wx Tokyo)"
loaded_plugins['wx'] = alt_instance
self.plugin_metadata['wx'] = alt_metadata
continue
# If an alternative plugin has the same name as a default plugin,
# it will replace it (unless already overridden by config)
if alt_plugin_name in loaded_plugins:
if alt_plugin_name not in self.plugin_overrides:
self.logger.info(
f"Alternative plugin '{alt_file}' replaces default plugin '{alt_plugin_name}' "
f"(same plugin name detected)"
)
else:
self.logger.info(f"Loading standalone alternative plugin: {alt_plugin_name} from {alt_file}")
loaded_plugins[alt_plugin_name] = alt_instance
self.plugin_metadata[alt_plugin_name] = alt_metadata
# Build keyword mappings for all loaded plugins
for plugin_name, plugin_instance in loaded_plugins.items():
metadata = self.plugin_metadata[plugin_name]
self._build_keyword_mappings(plugin_name, metadata)
self.loaded_plugins = loaded_plugins
self.logger.info(f"Loaded {len(loaded_plugins)} plugins: {list(loaded_plugins.keys())}")
return loaded_plugins
def _build_keyword_mappings(self, plugin_name: str, metadata: Dict[str, Any]):
"""Build keyword to plugin name mappings"""
# Map keywords to plugin name
for keyword in metadata.get('keywords', []):
self.keyword_mappings[keyword.lower()] = plugin_name
# Map aliases to plugin name
for alias in metadata.get('aliases', []):
self.keyword_mappings[alias.lower()] = plugin_name
def get_plugin_by_keyword(self, keyword: str) -> Optional[BaseCommand]:
"""Get a plugin instance by keyword"""
plugin_name = self.keyword_mappings.get(keyword.lower())
if plugin_name:
return self.loaded_plugins.get(plugin_name)
return None
def get_plugin_by_name(self, name: str) -> Optional[BaseCommand]:
"""Get a plugin instance by name"""
return self.loaded_plugins.get(name)
def get_all_plugins(self) -> Dict[str, BaseCommand]:
"""Get all loaded plugins"""
return self.loaded_plugins.copy()
def get_plugin_metadata(self, plugin_name: str = None) -> Dict[str, Any]:
"""Get metadata for a specific plugin or all plugins"""
if plugin_name:
return self.plugin_metadata.get(plugin_name, {})
return self.plugin_metadata.copy()
def get_plugins_by_category(self, category: str) -> Dict[str, BaseCommand]:
"""Get all plugins in a specific category"""
return {
name: plugin for name, plugin in self.loaded_plugins.items()
if plugin.category == category
}
def reload_plugin(self, plugin_name: str) -> bool:
"""Reload a specific plugin"""
try:
# Remove from loaded plugins
if plugin_name in self.loaded_plugins:
del self.loaded_plugins[plugin_name]
# Remove from metadata
if plugin_name in self.plugin_metadata:
del self.plugin_metadata[plugin_name]
# Remove keyword mappings
keywords_to_remove = []
for keyword, mapped_name in self.keyword_mappings.items():
if mapped_name == plugin_name:
keywords_to_remove.append(keyword)
for keyword in keywords_to_remove:
del self.keyword_mappings[keyword]
# Check if this plugin should be loaded from alternatives
from_alternatives = False
if plugin_name in self.plugin_overrides:
# This plugin is overridden, reload from alternatives
alternative_file = self.plugin_overrides[plugin_name]
plugin_instance = self.load_plugin(alternative_file, from_alternatives=True)
else:
# Try to find the plugin file name
# First check if it's in default plugins
default_plugins = self.discover_plugins()
plugin_file = None
for df in default_plugins:
test_instance = self.load_plugin(df, from_alternatives=False)
if test_instance and test_instance.get_metadata().get('name') == plugin_name:
plugin_file = df
break
if plugin_file:
plugin_instance = self.load_plugin(plugin_file, from_alternatives=False)
else:
# Try alternatives
alt_plugins = self.discover_alternative_plugins()
for alt in alt_plugins:
test_instance = self.load_plugin(alt, from_alternatives=True)
if test_instance and test_instance.get_metadata().get('name') == plugin_name:
plugin_instance = self.load_plugin(alt, from_alternatives=True)
break
else:
plugin_instance = None
if plugin_instance:
metadata = plugin_instance.get_metadata()
self.loaded_plugins[plugin_name] = plugin_instance
self.plugin_metadata[plugin_name] = metadata
self._build_keyword_mappings(plugin_name, metadata)
self.logger.info(f"Successfully reloaded plugin: {plugin_name}")
return True
else:
self.logger.error(f"Failed to reload plugin: {plugin_name}")
return False
except Exception as e:
self.logger.error(f"Error reloading plugin {plugin_name}: {e}")
return False
def validate_plugin(self, plugin_instance: BaseCommand) -> List[str]:
"""Validate a plugin instance and return any issues"""
issues = []
metadata = plugin_instance.get_metadata()
# Check required metadata
if not metadata.get('name'):
issues.append("Plugin missing 'name' metadata")
if not metadata.get('description'):
issues.append("Plugin missing 'description' metadata")
# Check if execute method is implemented
if not hasattr(plugin_instance, 'execute'):
issues.append("Plugin missing 'execute' method")
# Check for keyword conflicts
for keyword in metadata.get('keywords', []):
if keyword.lower() in self.keyword_mappings:
existing_plugin = self.keyword_mappings[keyword.lower()]
if existing_plugin != metadata['name']:
issues.append(f"Keyword '{keyword}' conflicts with plugin '{existing_plugin}'")
return issues
# Import inspect at module level for the load_plugin method
import inspect