From 8a35dc82703e1275f2b13035cd39fa08ee715924 Mon Sep 17 00:00:00 2001 From: eddieoz Date: Mon, 8 Dec 2025 23:37:41 +0200 Subject: [PATCH] feat: Add security utilities for path validation, SQL injection prevention, and input sanitization. --- config.ini.example | 264 ++++++++++++------------------ modules/commands/base_command.py | 77 ++++++--- modules/core.py | 19 +++ modules/db_manager.py | 61 ++++++- modules/message_handler.py | 19 ++- modules/security_utils.py | 257 +++++++++++++++++++++++++++++ modules/web_viewer/integration.py | 59 ++++++- 7 files changed, 555 insertions(+), 201 deletions(-) create mode 100644 modules/security_utils.py diff --git a/config.ini.example b/config.ini.example index 88d0ba6..3953276 100644 --- a/config.ini.example +++ b/config.ini.example @@ -58,8 +58,8 @@ bot_tx_rate_limit_seconds = 1.0 # Transmission delay in milliseconds before sending messages # Helps prevent message collisions on the mesh network -# Recommended: 400-800ms for busy networks, 100 for quiet networks -tx_delay_ms = 400 +# Recommended: 100-500ms for busy networks, 0 for quiet networks +tx_delay_ms = 250 # DM retry settings for improved reliability (meshcore-2.1.6+) # Maximum number of retry attempts for failed DM sends @@ -90,12 +90,6 @@ bot_longitude = -74.0060 # Set to a lower value if you want to limit channel fetching for performance max_channels = 12 -# Channel refresh interval in seconds -# How often to refresh channel list from device to prevent stale data in database -# Default: 3600 (1 hour). Set to 0 to disable periodic refresh (channels only refreshed on startup) -# This ensures the database stays in sync if channels are changed on the device directly -channel_refresh_interval_seconds = 3600 - # Interval-based advertising settings # Send periodic flood adverts at specified intervals # 0: Disabled (default) @@ -112,7 +106,7 @@ startup_advert = false # device: Device handles auto-addition using standard auto-discovery mode, bot manages contact list capacity (purge old contacts when near limits) # bot: Bot automatically adds new companion contacts to device, bot manages contact list capacity (purge old contacts when near limits) # false: Manual mode - no automatic actions, use !repeater commands to manage contacts (default) -auto_manage_contacts = bot +auto_manage_contacts = false [Localization] # Language code for bot responses (en, es, es-MX, es-ES, fr, de, ja, etc.) @@ -132,13 +126,22 @@ translation_path = translations/ [Admin_ACL] # Admin Access Control List (ACL) for restricted commands # Only users with public keys listed here can execute admin commands -# Format: comma-separated list of public keys (without spaces) -# Example: 3e14a5fa9c52e7b6b8e14f987cd12ab1e3dc9fb64a2bbf0e19dca85e9f43a120, another key here +# +# SECURITY IMPORTANT: +# - Public keys MUST be exactly 64 hexadecimal characters (ed25519 format) +# - Invalid formats will be rejected with error logs +# - Empty or whitespace-only values disable admin access +# - Keys are case-insensitive (normalized to lowercase) +# +# Format: comma-separated list of 64-character hex public keys (without spaces) +# Example: f5d2b56d19b24412756933e917d4632e088cdd5daeadc9002feca73bf5d2b56d,1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef +# +# IMPORTANT: Leave blank to disable all admin commands. Set your actual admin pubkey(s) here. admin_pubkeys = # Commands that require admin access (comma-separated) # These commands will only work for users in the admin_pubkeys list -admin_commands = repeater,webviewer +admin_commands = repeater,webviewer,webviewer [Plugin_Overrides] # Plugin Overrides - Use alternative plugin implementations @@ -260,6 +263,73 @@ colored_output = true # Options: DEBUG, INFO, WARNING, ERROR, CRITICAL meshcore_log_level = INFO +[Greeter_Command] +# Enable or disable the greeter command +# true: Bot will greet users on their first public channel message +# false: Greeter is disabled +enabled = false + +# Channels where greetings should occur (comma-separated) +# IMPORTANT: Leave commented out (or omit entirely) to use global monitor_channels (default behavior) +# If uncommented with empty value (channels = ), command will be DM-only +# Comma-separated list to restrict to specific channels (only greeter command works there) +# Example: channels = general,welcome,newbies +# If not specified, uses the channels from [Channels] monitor_channels setting +# channels = + +# Greeting message template (default for all channels) +# Available fields: {sender} - the user's name/ID +# For multi-part greetings, separate messages with pipe (|) +# Example (single): "Welcome to the mesh, @[{sender}]!" +# Example (multi-part): "Welcome to the mesh, @[{sender}]!|This is a great place to chat.|Use !help for commands." +greeting_message = Welcome to the mesh, @[{sender}]! + +# Channel-specific greeting messages (optional) +# Format: channel_name:greeting_message,channel_name2:greeting_message2 +# If a channel has a specific greeting, it will be used instead of the default greeting_message +# Example: Public:Welcome to Public channel, @[{sender}]!|general:Welcome to general, @[{sender}]! +# Multi-part greetings are supported per channel using pipe (|) separator +# Leave empty to use greeting_message for all channels +channel_greetings = + +# Per-channel greetings (tracking behavior) +# false: Greet each user only once globally (default - user gets one greeting total) +# true: Greet each user once per channel (user can be greeted on each channel separately) +# Note: This controls tracking, not the greeting message itself. Use channel_greetings for different messages. +per_channel_greetings = false + +# Include mesh network information in greeting +# true: Add mesh statistics to greeting (total contacts, repeaters, etc.) +# false: Only send the greeting message +include_mesh_info = true + +# Mesh info format template +# Available fields: {total_contacts}, {repeaters}, {companions}, {recent_activity_24h} +# Example: "\n\nMesh Info: {total_contacts} contacts, {repeaters} repeaters" +# Note: Mesh info is appended to the last greeting message part +mesh_info_format = \n\nMesh Info: {total_contacts} contacts, {repeaters} repeaters, {recent_activity_24h} active in last 24h + +# Rollout period in days +# When greeter is first enabled on an active mesh, this sets how many days +# to listen and mark all active users as already greeted before beginning +# to greet new users. This prevents greeting everyone on an established mesh. +# Set to 0 to disable rollout (will greet all new users immediately) +# Note: Use auto_backfill to mark historical users and shorten/eliminate rollout period +rollout_days = 7 + +# Auto-backfill from historical message_stats data +# true: Automatically mark all users who have posted on public channels in the past +# false: Only mark users during rollout period (default) +# This allows shortening or eliminating the rollout period by using existing data +auto_backfill = false + +# Backfill lookback period in days +# Number of days to look back when auto-backfilling (0 = all time) +# Only used if auto_backfill = true +# Example: 30 = only mark users who posted in last 30 days +# Example: 0 = mark all users who have ever posted (all time) +backfill_lookback_days = 30 + [Custom_Syntax] # Custom syntax patterns for special message formats # Format: pattern = "response_format" @@ -311,55 +381,6 @@ repeater_prefix_api_url = # Recommended: 1-6 hours (data doesn't change frequently) repeater_prefix_cache_hours = 1 -[Feed_Manager] -# Enable feed manager functionality -# true: Feed manager is enabled and will poll feeds -# false: Feed manager is disabled -feed_manager_enabled = false - -# Default check interval in seconds (5 minutes) -# How often to check feeds for new items -# Individual feeds can override this with their own interval -default_check_interval_seconds = 300 - -# Maximum number of items to process per feed per check -# Prevents overwhelming channels with too many items at once -max_items_per_check = 10 - -# Default output format for feed messages -# Placeholders: {title}, {body}, {date}, {link}, {emoji} -# Shortening functions: {field|truncate:N}, {field|word_wrap:N}, {field|first_words:N} -# Example: "{emoji} {body|truncate:100} - {date}\n{link|truncate:50}" -# Default uses body instead of title for main content -default_output_format = {emoji} {body|truncate:100} - {date}\n{link|truncate:50} - -# Default message send interval in seconds -# How long to wait between sending queued messages from the same feed -# Prevents rate limiting by spacing out message sends -# Individual feeds can override this with their own interval -default_message_send_interval_seconds = 2.0 - -# Request timeout in seconds -# Maximum time to wait for feed requests -feed_request_timeout = 30 - -# User-Agent string for HTTP requests -# Identifies the bot when fetching feeds -feed_user_agent = MeshCoreBot/1.0 FeedManager - -# Rate limiting: minimum seconds between requests to same domain -# Prevents overwhelming feed sources -feed_rate_limit_seconds = 5 - -# Maximum message length (mesh limit is 130) -# Messages longer than this will be truncated -max_message_length = 130 - -# Enable/disable feed command -# true: Feed command is available (admin only) -# false: Feed command is disabled -feed_command_enabled = true - [Prefix_Command] # Enable or disable repeater geolocation in prefix command # true: Show city names with repeaters when location data is available @@ -579,7 +600,7 @@ low_confidence_symbol = ❓ # Enable "p" shortcut for path command (similar to "t" for test command) # true: Respond to just "p" or "p " as a shortcut for "path" # false: Only respond to "path", "decode", or "route" keywords (default) -enable_p_shortcut = true +enable_p_shortcut = false [Hacker_Command] # Enable or disable the hacker command @@ -622,116 +643,37 @@ alert_enabled = false [Web_Viewer] # Enable or disable the web data viewer +# SECURITY NOTE: Web viewer has NO AUTHENTICATION built-in +# Only enable if you understand the security implications # true: web viewer is available for viewing bot data -# false: web viewer is disabled -enabled = true +# false: web viewer is disabled (RECOMMENDED for production) +enabled = false # Web viewer host address -# 127.0.0.1: Only accessible from localhost -# 0.0.0.0: Accessible from any network interface +# SECURITY WARNING: +# - 127.0.0.1: Only accessible from localhost (SECURE - recommended) +# - 0.0.0.0: Accessible from any network interface (INSECURE - exposes all bot data to network) +# +# Using 0.0.0.0 will expose: +# - All messages and their contents +# - Contact list and repeater information +# - Real-time packet stream +# - Bot configuration details +# +# WITHOUT ANY AUTHENTICATION OR ENCRYPTION host = 127.0.0.1 # Web viewer port -# Default: 8080 (viewer uses port 8080) +# Must be between 1024-65535 (non-privileged ports) +# Default: 8080 port = 8080 # Enable debug mode for the web viewer # true: Enable Flask debug mode (auto-reload on changes) -# false: Production mode +# false: Production mode (recommended) debug = false # Auto-start web viewer with bot # true: Start web viewer automatically when bot starts -# false: Start web viewer manually -auto_start = false - -[Greeter_Command] -# Enable or disable the greeter command -# true: Bot will greet users on their first public channel message -# false: Greeter is disabled -enabled = false - -# Channels where greetings should occur (comma-separated) -# IMPORTANT: Leave commented out (or omit entirely) to use global monitor_channels (default behavior) -# If uncommented with empty value (channels = ), command will be DM-only -# Comma-separated list to restrict to specific channels (only greeter command works there) -# Example: channels = general,welcome,newbies -# If not specified, uses the channels from [Channels] monitor_channels setting -# channels = - -# Greeting message template (default for all channels) -# Available fields: {sender} - the user's name/ID -# For multi-part greetings, separate messages with pipe (|) -# Example (single): "Welcome to the mesh, @[{sender}]!" -# Example (multi-part): "Welcome to the mesh, @[{sender}]!|This is a great place to chat.|Use !help for commands." -greeting_message = Welcome to the mesh, @[{sender}]! - -# Channel-specific greeting messages (optional) -# Format: channel_name:greeting_message,channel_name2:greeting_message2 -# If a channel has a specific greeting, it will be used instead of the default greeting_message -# Example: Public:Welcome to Public channel, @[{sender}]!|general:Welcome to general, @[{sender}]! -# Multi-part greetings are supported per channel using pipe (|) separator -# Leave empty to use greeting_message for all channels -channel_greetings = - -# Per-channel greetings (tracking behavior) -# false: Greet each user only once globally (default - user gets one greeting total) -# true: Greet each user once per channel (user can be greeted on each channel separately) -# Note: This controls tracking, not the greeting message itself. Use channel_greetings for different messages. -per_channel_greetings = false - -# Include mesh network information in greeting -# true: Add mesh statistics to greeting (total contacts, repeaters, etc.) -# false: Only send the greeting message -include_mesh_info = true - -# Mesh info format template -# Available fields: {total_contacts}, {repeaters}, {companions}, {recent_activity_24h} -# Example: "\n\nMesh Info: {total_contacts} contacts, {repeaters} repeaters" -# Note: Mesh info is appended to the last greeting message part -mesh_info_format = \n\nMesh Info: {total_contacts} contacts, {repeaters} repeaters, {recent_activity_24h} active in last 24h - -# Rollout period in days -# When greeter is first enabled on an active mesh, this sets how many days -# to listen and mark all active users as already greeted before beginning -# to greet new users. This prevents greeting everyone on an established mesh. -# Set to 0 to disable rollout (will greet all new users immediately) -# Note: Use auto_backfill to mark historical users and shorten/eliminate rollout period -rollout_days = 7 - -# Auto-backfill from historical message_stats data -# true: Automatically mark all users who have posted on public channels in the past -# false: Only mark users during rollout period (default) -# This allows shortening or eliminating the rollout period by using existing data -auto_backfill = false - -# Backfill lookback period in days -# Number of days to look back when auto-backfilling (0 = all time) -# Only used if auto_backfill = true -# Example: 30 = only mark users who posted in last 30 days -# Example: 0 = mark all users who have ever posted (all time) -backfill_lookback_days = 30 - -# Dead air delay in seconds -# Wait this many seconds before sending a greeting to a new user -# This allows time for other users to greet the new user first -# Set to 0 to send greetings immediately (default behavior) -# Example: 30 = wait 30 seconds before greeting -dead_air_delay_seconds = 0 - -# Defer to human greeting -# If enabled and dead_air_delay_seconds > 0, the bot will not send a greeting -# if another user mentions the new user's name within the dead air delay period -# true: Check for human greetings and defer if found -# false: Always send bot greeting after delay (default) -# Note: Only effective when dead_air_delay_seconds > 0 -defer_to_human_greeting = false - -# Levenshtein distance for name matching -# Maximum edit distance allowed when checking if a user has been greeted before -# Prevents duplicate greetings for users who make small changes to their name -# Set to 0 to disable fuzzy matching (exact name match only, default) -# Example: 2 = "John" and "Jon" (distance 1) or "John" and "Jhon" (distance 1) would match -# Example: 3 = "Alice" and "Alicia" (distance 2) would match -# Recommended: 1-3 for most use cases -levenshtein_distance = 0 \ No newline at end of file +# false: Start web viewer manually (recommended) +auto_start = false \ No newline at end of file diff --git a/modules/commands/base_command.py b/modules/commands/base_command.py index 5e58e1a..77e5022 100644 --- a/modules/commands/base_command.py +++ b/modules/commands/base_command.py @@ -8,6 +8,7 @@ from abc import ABC, abstractmethod from typing import Optional, List, Dict, Any, Tuple from datetime import datetime import pytz +import re from ..models import MeshMessage @@ -415,51 +416,83 @@ class BaseCommand(ABC): return False def _check_admin_access(self, message: MeshMessage) -> bool: - """Check if the message sender has admin access""" + """ + Check if the message sender has admin access (security-hardened) + + Security features: + - Strict pubkey format validation (64-char hex) + - No fallback to sender_id (prevents spoofing) + - Whitespace/empty config detection + - Normalized comparison (lowercase) + """ + import re # This import is needed for re.match if not hasattr(self.bot, 'config'): return False try: # Get admin pubkeys from config admin_pubkeys = self.bot.config.get('Admin_ACL', 'admin_pubkeys', fallback='') - if not admin_pubkeys: - self.logger.warning("No admin pubkeys configured") + + # Check for empty or whitespace-only configuration + if not admin_pubkeys.strip(): + self.logger.warning("No admin pubkeys configured or empty/whitespace config") return False - # Parse admin pubkeys - admin_pubkey_list = [key.strip() for key in admin_pubkeys.split(',') if key.strip()] + # Parse and VALIDATE admin pubkeys + admin_pubkey_list = [] + for key in admin_pubkeys.split(','): + key = key.strip() + if not key: + continue + + # Validate hex format (64 chars for ed25519 public keys) + if not re.match(r'^[0-9a-fA-F]{64}$', key): + self.logger.error(f"Invalid admin pubkey format in config: {key[:16]}...") + continue # Skip invalid keys but continue checking others + + admin_pubkey_list.append(key.lower()) # Normalize to lowercase + if not admin_pubkey_list: - self.logger.warning("No valid admin pubkeys found in config") + self.logger.error("No valid admin pubkeys found in config after validation") return False - # Get sender's public key from message + # Get sender's public key - NEVER fall back to sender_id sender_pubkey = getattr(message, 'sender_pubkey', None) if not sender_pubkey: - # Try to get from sender_id if it's a pubkey - sender_pubkey = getattr(message, 'sender_id', None) - - if not sender_pubkey: - self.logger.warning(f"No sender public key found for message from {message.sender_id}") + self.logger.warning( + f"No sender public key available for {message.sender_id} - " + "admin access denied (missing pubkey)" + ) return False - # Check if sender's pubkey matches any admin key (exact match required for security) - is_admin = False - for admin_key in admin_pubkey_list: - # Only allow exact matches for security - if sender_pubkey == admin_key: - is_admin = True - break + # Validate sender pubkey format + if not re.match(r'^[0-9a-fA-F]{64}$', sender_pubkey): + self.logger.warning( + f"Invalid sender pubkey format from {message.sender_id}: " + f"{sender_pubkey[:16]}... - admin access denied" + ) + return False + + # Normalize and compare + sender_pubkey_normalized = sender_pubkey.lower() + is_admin = sender_pubkey_normalized in admin_pubkey_list if not is_admin: - self.logger.info(f"Access denied for {message.sender_id} (pubkey: {sender_pubkey[:16]}...) - not in admin ACL") + self.logger.warning( + f"Access denied for {message.sender_id} " + f"(pubkey: {sender_pubkey[:16]}...) - not in admin ACL" + ) else: - self.logger.info(f"Admin access granted for {message.sender_id} (pubkey: {sender_pubkey[:16]}...)") + self.logger.info( + f"Admin access granted for {message.sender_id} " + f"(pubkey: {sender_pubkey[:16]}...)" + ) return is_admin except Exception as e: self.logger.error(f"Error checking admin access: {e}") - return False + return False # Fail securely def _strip_quotes_from_config(self, value: str) -> str: """Strip quotes from config values if present""" diff --git a/modules/core.py b/modules/core.py index e45d3dc..60ea200 100644 --- a/modules/core.py +++ b/modules/core.py @@ -36,6 +36,7 @@ from .i18n import Translator from .solar_conditions import set_config from .web_viewer.integration import WebViewerIntegration from .feed_manager import FeedManager +from .security_utils import validate_safe_path class MeshCoreBot: @@ -58,6 +59,15 @@ class MeshCoreBot: # Initialize database manager first (needed by plugins) db_path = self.config.get('Bot', 'db_path', fallback='meshcore_bot.db') + + # Validate database path for security (prevent path traversal) + try: + db_path = str(validate_safe_path(db_path, base_dir='.', allow_absolute=False)) + except ValueError as e: + self.logger.error(f"Invalid database path: {e}") + self.logger.error("Using default: meshcore_bot.db") + db_path = 'meshcore_bot.db' + self.logger.info(f"Initializing database manager with database: {db_path}") try: self.db_manager = DBManager(self, db_path) @@ -550,6 +560,15 @@ use_zulu_time = false # File handler log_file = self.config.get('Logging', 'log_file', fallback='meshcore_bot.log') + + # Validate log file path for security (prevent path traversal) + try: + log_file = str(validate_safe_path(log_file, base_dir='.', allow_absolute=False)) + except ValueError as e: + self.logger.warning(f"Invalid log file path: {e}") + self.logger.warning("Using default: meshcore_bot.log") + log_file = 'meshcore_bot.log' + file_handler = logging.FileHandler(log_file) file_handler.setFormatter(formatter) self.logger.addHandler(file_handler) diff --git a/modules/db_manager.py b/modules/db_manager.py index 5e65ed3..4756980 100644 --- a/modules/db_manager.py +++ b/modules/db_manager.py @@ -6,6 +6,7 @@ Provides common database operations and table management for the MeshCore Bot import sqlite3 import json +import re from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple, Any from pathlib import Path @@ -14,6 +15,21 @@ from pathlib import Path class DBManager: """Generalized database manager for common operations""" + # Whitelist of allowed tables for security + ALLOWED_TABLES = { + 'geocoding_cache', + 'generic_cache', + 'bot_metadata', + 'packet_stream', + 'message_stats', + 'greeted_users', + 'repeater_contacts', + 'repeater_interactions', + 'complete_contact_tracking', # Repeater manager + 'daily_stats', # Repeater manager + 'purging_log', # Repeater manager + } + def __init__(self, bot, db_path: str = "meshcore_bot.db"): self.bot = bot self.logger = bot.logger @@ -217,13 +233,18 @@ class DBManager: def cache_geocoding(self, query: str, latitude: float, longitude: float, cache_hours: int = 720): """Cache geocoding result for future use (default: 30 days)""" try: + # Validate cache_hours to prevent SQL injection + if not isinstance(cache_hours, int) or cache_hours < 1 or cache_hours > 87600: # Max 10 years + raise ValueError(f"cache_hours must be an integer between 1 and 87600, got: {cache_hours}") + with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() + # Use parameter binding instead of string formatting cursor.execute(''' INSERT OR REPLACE INTO geocoding_cache (query, latitude, longitude, expires_at) - VALUES (?, ?, ?, datetime('now', '+{} hours')) - '''.format(cache_hours), (query, latitude, longitude)) + VALUES (?, ?, ?, datetime('now', '+' || ? || ' hours')) + ''', (query, latitude, longitude, cache_hours)) conn.commit() except Exception as e: self.logger.error(f"Error caching geocoding: {e}") @@ -249,13 +270,18 @@ class DBManager: def cache_value(self, cache_key: str, cache_value: str, cache_type: str, cache_hours: int = 24): """Cache a value for future use""" try: + # Validate cache_hours to prevent SQL injection + if not isinstance(cache_hours, int) or cache_hours < 1 or cache_hours > 87600: # Max 10 years + raise ValueError(f"cache_hours must be an integer between 1 and 87600, got: {cache_hours}") + with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() + # Use parameter binding instead of string formatting cursor.execute(''' INSERT OR REPLACE INTO generic_cache (cache_key, cache_value, cache_type, expires_at) - VALUES (?, ?, ?, datetime('now', '+{} hours')) - '''.format(cache_hours), (cache_key, cache_value, cache_type)) + VALUES (?, ?, ?, datetime('now', '+' || ? || ' hours')) + ''', (cache_key, cache_value, cache_type, cache_hours)) conn.commit() except Exception as e: self.logger.error(f"Error caching value: {e}") @@ -363,26 +389,49 @@ class DBManager: # Table management methods def create_table(self, table_name: str, schema: str): - """Create a custom table with the given schema""" + """Create a custom table with the given schema (whitelist-protected)""" try: + # Validate table name against whitelist + if table_name not in self.ALLOWED_TABLES: + raise ValueError(f"Table name '{table_name}' not in allowed tables whitelist") + + # Additional validation: ensure table name follows safe naming convention + if not re.match(r'^[a-z_][a-z0-9_]*$', table_name): + raise ValueError(f"Invalid table name format: {table_name}") + with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() + # Table names cannot be parameterized, but we've validated against whitelist cursor.execute(f'CREATE TABLE IF NOT EXISTS {table_name} ({schema})') conn.commit() self.logger.info(f"Created table: {table_name}") except Exception as e: self.logger.error(f"Error creating table {table_name}: {e}") + raise def drop_table(self, table_name: str): - """Drop a table (use with caution)""" + """Drop a table (whitelist-protected, use with extreme caution)""" try: + # Validate table name against whitelist + if table_name not in self.ALLOWED_TABLES: + raise ValueError(f"Table name '{table_name}' not in allowed tables whitelist") + + # Additional validation: ensure table name follows safe naming convention + if not re.match(r'^[a-z_][a-z0-9_]*$', table_name): + raise ValueError(f"Invalid table name format: {table_name}") + + # Extra safety: log critical action + self.logger.warning(f"CRITICAL: Dropping table '{table_name}'") + with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() + # Table names cannot be parameterized, but we've validated against whitelist cursor.execute(f'DROP TABLE IF EXISTS {table_name}') conn.commit() self.logger.info(f"Dropped table: {table_name}") except Exception as e: self.logger.error(f"Error dropping table {table_name}: {e}") + raise def execute_query(self, query: str, params: Tuple = ()) -> List[Dict]: """Execute a custom query and return results as list of dictionaries""" diff --git a/modules/message_handler.py b/modules/message_handler.py index c6c302c..4e63482 100644 --- a/modules/message_handler.py +++ b/modules/message_handler.py @@ -14,6 +14,7 @@ from meshcore import EventType from .models import MeshMessage from .enums import PayloadType, PayloadVersion, RouteType, AdvertFlags, DeviceRole from .utils import calculate_packet_hash +from .security_utils import sanitize_input class MessageHandler: @@ -251,28 +252,34 @@ class MessageHandler: # Look up contact name from pubkey prefix sender_id = payload.get('pubkey_prefix', '') + sender_name = sender_id # Default to sender_id if hasattr(self.bot.meshcore, 'contacts') and self.bot.meshcore.contacts: for contact_key, contact_data in self.bot.meshcore.contacts.items(): if contact_data.get('public_key', '').startswith(sender_id): # Use the contact name if available, otherwise use adv_name contact_name = contact_data.get('name', contact_data.get('adv_name', sender_id)) - sender_id = contact_name + sender_name = contact_name break # Get the full public key from contacts if available sender_pubkey = payload.get('pubkey_prefix', '') + sender_pubkey = sender_id # Default to sender_id if hasattr(self.bot.meshcore, 'contacts') and self.bot.meshcore.contacts: for contact_key, contact_data in self.bot.meshcore.contacts.items(): - if contact_data.get('public_key', '').startswith(sender_pubkey): + if contact_data.get('public_key', '').startswith(sender_id): # Use the full public key from the contact - sender_pubkey = contact_data.get('public_key', sender_pubkey) - self.logger.debug(f"Found full public key for {sender_id}: {sender_pubkey[:16]}...") + sender_pubkey = contact_data.get('public_key', sender_id) + self.logger.debug(f"Found full public key for {sender_name}: {sender_pubkey[:16]}...") break + # Sanitize message content to prevent injection attacks + message_content = payload.get('text', '') + message_content = sanitize_input(message_content, max_length=500, strip_controls=True) + # Convert to our message format message = MeshMessage( - content=payload.get('text', ''), - sender_id=sender_id, + content=message_content, + sender_id=sender_name, sender_pubkey=sender_pubkey, is_dm=True, timestamp=timestamp, diff --git a/modules/security_utils.py b/modules/security_utils.py new file mode 100644 index 0000000..6caf703 --- /dev/null +++ b/modules/security_utils.py @@ -0,0 +1,257 @@ +#!/usr/bin/env python3 +""" +Security Utilities for MeshCore Bot +Provides centralized security validation functions to prevent common attacks +""" + +import re +import ipaddress +import socket +from pathlib import Path +from typing import Optional +from urllib.parse import urlparse +import logging + +logger = logging.getLogger('MeshCoreBot.Security') + + +def validate_external_url(url: str, allow_localhost: bool = False) -> bool: + """ + Validate that URL points to safe external resource (SSRF protection) + + Args: + url: URL to validate + allow_localhost: Whether to allow localhost/private IPs (default: False) + + Returns: + True if URL is safe, False otherwise + + Raises: + ValueError: If URL is invalid or unsafe + """ + try: + parsed = urlparse(url) + + # Only allow HTTP/HTTPS + if parsed.scheme not in ['http', 'https']: + logger.warning(f"URL scheme not allowed: {parsed.scheme}") + return False + + # Reject file:// and other dangerous schemes + if not parsed.netloc: + logger.warning(f"URL missing network location: {url}") + return False + + # Resolve and check if IP is internal/private + try: + ip = socket.gethostbyname(parsed.hostname) + ip_obj = ipaddress.ip_address(ip) + + # If localhost is not allowed, reject private/internal IPs + if not allow_localhost: + # Reject private/internal IPs + if ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_link_local: + logger.warning(f"URL resolves to private/internal IP: {ip}") + return False + + # Reject reserved ranges + if ip_obj.is_reserved or ip_obj.is_multicast: + logger.warning(f"URL resolves to reserved/multicast IP: {ip}") + return False + + except socket.gaierror as e: + logger.warning(f"Failed to resolve hostname {parsed.hostname}: {e}") + return False + + return True + + except Exception as e: + logger.error(f"URL validation failed: {e}") + return False + + +def validate_safe_path(file_path: str, base_dir: str = '.', allow_absolute: bool = False) -> Path: + """ + Validate that path is safe and within base directory (path traversal protection) + + Args: + file_path: Path to validate + base_dir: Base directory that path must be within (default: current dir) + allow_absolute: Whether to allow absolute paths outside base_dir + + Returns: + Resolved Path object if safe + + Raises: + ValueError: If path is unsafe or attempts traversal + """ + try: + # Resolve absolute paths + base = Path(base_dir).resolve() + target = Path(file_path).resolve() + + # If absolute paths are not allowed, ensure target is within base + if not allow_absolute: + # Check if target is within base directory + try: + target.relative_to(base) + except ValueError: + raise ValueError( + f"Path traversal detected: {file_path} is not within {base_dir}" + ) + + # Reject certain dangerous system paths + dangerous_prefixes = ['/etc', '/sys', '/proc', '/dev', '/bin', '/sbin', '/boot'] + target_str = str(target) + if any(target_str.startswith(prefix) for prefix in dangerous_prefixes): + raise ValueError(f"Access to system directory denied: {file_path}") + + return target + + except Exception as e: + raise ValueError(f"Invalid or unsafe file path: {file_path} - {e}") + + +def sanitize_input(content: str, max_length: int = 500, strip_controls: bool = True) -> str: + """ + Sanitize user input to prevent injection attacks + + Args: + content: Input string to sanitize + max_length: Maximum allowed length (default: 500 chars) + strip_controls: Whether to remove control characters (default: True) + + Returns: + Sanitized string + """ + if not isinstance(content, str): + content = str(content) + + # Limit length to prevent DoS + if len(content) > max_length: + content = content[:max_length] + logger.debug(f"Input truncated to {max_length} characters") + + # Remove control characters except newline, carriage return, tab + if strip_controls: + # Keep only printable characters plus common whitespace + content = ''.join( + char for char in content + if ord(char) >= 32 or char in '\n\r\t' + ) + + # Remove null bytes (can cause issues in C libraries) + content = content.replace('\x00', '') + + return content.strip() + + +def validate_api_key_format(api_key: str, min_length: int = 16) -> bool: + """ + Validate API key format + + Args: + api_key: API key to validate + min_length: Minimum required length (default: 16) + + Returns: + True if format is valid, False otherwise + """ + if not isinstance(api_key, str): + return False + + # Check minimum length + if len(api_key) < min_length: + return False + + # Check for obviously invalid patterns + invalid_patterns = [ + 'your_api_key_here', + 'placeholder', + 'example', + 'test_key', + '12345', + 'aaaa', + ] + + api_key_lower = api_key.lower() + if any(pattern in api_key_lower for pattern in invalid_patterns): + return False + + # Check that it's not all the same character + if len(set(api_key)) < 3: + return False + + return True + + +def validate_pubkey_format(pubkey: str, expected_length: int = 64) -> bool: + """ + Validate public key format (hex string) + + Args: + pubkey: Public key to validate + expected_length: Expected length in characters (default: 64 for ed25519) + + Returns: + True if format is valid, False otherwise + """ + if not isinstance(pubkey, str): + return False + + # Check exact length + if len(pubkey) != expected_length: + return False + + # Check hex format + if not re.match(r'^[0-9a-fA-F]+$', pubkey): + return False + + return True + + +def validate_port_number(port: int, allow_privileged: bool = False) -> bool: + """ + Validate port number + + Args: + port: Port number to validate + allow_privileged: Whether to allow privileged ports <1024 (default: False) + + Returns: + True if port is valid, False otherwise + """ + if not isinstance(port, int): + return False + + min_port = 1 if allow_privileged else 1024 + max_port = 65535 + + return min_port <= port <= max_port + + +def validate_integer_range(value: int, min_value: int, max_value: int, name: str = "value") -> bool: + """ + Validate integer is within range + + Args: + value: Integer to validate + min_value: Minimum allowed value (inclusive) + max_value: Maximum allowed value (inclusive) + name: Name of the value for error messages + + Returns: + True if valid + + Raises: + ValueError: If value is out of range + """ + if not isinstance(value, int): + raise ValueError(f"{name} must be an integer, got {type(value).__name__}") + + if value < min_value or value > max_value: + raise ValueError( + f"{name} must be between {min_value} and {max_value}, got {value}" + ) + + return True diff --git a/modules/web_viewer/integration.py b/modules/web_viewer/integration.py index e8ec6d1..0191b33 100644 --- a/modules/web_viewer/integration.py +++ b/modules/web_viewer/integration.py @@ -9,6 +9,7 @@ import time import subprocess import sys import os +import re from pathlib import Path class BotIntegration: @@ -247,6 +248,9 @@ class BotIntegration: class WebViewerIntegration: """Integration class for starting/stopping the web viewer with the bot""" + # Whitelist of allowed host bindings for security + ALLOWED_HOSTS = ['127.0.0.1', 'localhost', '0.0.0.0'] + def __init__(self, bot): self.bot = bot self.logger = bot.logger @@ -261,6 +265,9 @@ class WebViewerIntegration: self.debug = bot.config.getboolean('Web_Viewer', 'debug', fallback=False) self.auto_start = bot.config.getboolean('Web_Viewer', 'auto_start', fallback=False) + # Validate configuration for security + self._validate_config() + # Process monitoring self.restart_count = 0 self.max_restarts = 5 @@ -272,6 +279,32 @@ class WebViewerIntegration: if self.enabled and self.auto_start: self.start_viewer() + def _validate_config(self): + """Validate web viewer configuration for security""" + # Validate host against whitelist + if self.host not in self.ALLOWED_HOSTS: + raise ValueError( + f"Invalid host configuration: {self.host}. " + f"Allowed hosts: {', '.join(self.ALLOWED_HOSTS)}" + ) + + # Validate port range (avoid privileged ports) + if not isinstance(self.port, int) or not (1024 <= self.port <= 65535): + raise ValueError( + f"Port must be between 1024-65535 (non-privileged), got: {self.port}" + ) + + # Security warning for network exposure + if self.host == '0.0.0.0': + self.logger.warning( + "\n" + "="*70 + "\n" + "⚠️ SECURITY WARNING: Web viewer binding to all interfaces (0.0.0.0)\n" + "This exposes bot data (messages, contacts, routing) to your network\n" + "WITHOUT AUTHENTICATION. Ensure you have firewall protection!\n" + "For local-only access, use host=127.0.0.1 in config.\n" + + "="*70 + ) + def start_viewer(self): """Start the web viewer in a separate thread""" if self.running: @@ -327,12 +360,26 @@ class WebViewerIntegration: if result.returncode == 0 and result.stdout.strip(): pids = result.stdout.strip().split('\n') for pid in pids: - if pid.strip(): - try: - subprocess.run(['kill', '-9', pid.strip()], timeout=2) - self.logger.info(f"Killed remaining process {pid} on port {self.port}") - except Exception as e: - self.logger.warning(f"Failed to kill process {pid}: {e}") + pid = pid.strip() + if not pid: + continue + + # Validate PID is numeric only (prevent injection) + if not re.match(r'^\d+$', pid): + self.logger.warning(f"Invalid PID format: {pid}, skipping") + continue + + try: + pid_int = int(pid) + # Safety check: never kill system PIDs + if pid_int < 2: + self.logger.warning(f"Refusing to kill system PID: {pid}") + continue + + subprocess.run(['kill', '-9', str(pid_int)], timeout=2) + self.logger.info(f"Killed remaining process {pid} on port {self.port}") + except (ValueError, subprocess.TimeoutExpired) as e: + self.logger.warning(f"Failed to kill process {pid}: {e}") except Exception as e: self.logger.debug(f"Port cleanup check failed: {e}")