From 5a5c29992b21b18e8f7799b99c663847de5e9d97 Mon Sep 17 00:00:00 2001 From: eddieoz Date: Mon, 8 Dec 2025 23:37:41 +0200 Subject: [PATCH 1/3] feat: Add security utilities for path validation, SQL injection prevention, and input sanitization. --- config.ini.example | 41 +++-- modules/commands/base_command.py | 77 ++++++--- modules/core.py | 19 +++ modules/db_manager.py | 61 ++++++- modules/message_handler.py | 19 ++- modules/security_utils.py | 257 ++++++++++++++++++++++++++++++ modules/web_viewer/integration.py | 59 ++++++- 7 files changed, 483 insertions(+), 50 deletions(-) create mode 100644 modules/security_utils.py diff --git a/config.ini.example b/config.ini.example index 580ac45..3953276 100644 --- a/config.ini.example +++ b/config.ini.example @@ -126,13 +126,22 @@ translation_path = translations/ [Admin_ACL] # Admin Access Control List (ACL) for restricted commands # Only users with public keys listed here can execute admin commands -# Format: comma-separated list of public keys (without spaces) -# Example: 3e14a5fa9c52e7b6b8e14f987cd12ab1e3dc9fb64a2bbf0e19dca85e9f43a120, another key here +# +# SECURITY IMPORTANT: +# - Public keys MUST be exactly 64 hexadecimal characters (ed25519 format) +# - Invalid formats will be rejected with error logs +# - Empty or whitespace-only values disable admin access +# - Keys are case-insensitive (normalized to lowercase) +# +# Format: comma-separated list of 64-character hex public keys (without spaces) +# Example: f5d2b56d19b24412756933e917d4632e088cdd5daeadc9002feca73bf5d2b56d,1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef +# +# IMPORTANT: Leave blank to disable all admin commands. Set your actual admin pubkey(s) here. admin_pubkeys = # Commands that require admin access (comma-separated) # These commands will only work for users in the admin_pubkeys list -admin_commands = repeater,webviewer +admin_commands = repeater,webviewer,webviewer [Plugin_Overrides] # Plugin Overrides - Use alternative plugin implementations @@ -634,25 +643,37 @@ alert_enabled = false [Web_Viewer] # Enable or disable the web data viewer +# SECURITY NOTE: Web viewer has NO AUTHENTICATION built-in +# Only enable if you understand the security implications # true: web viewer is available for viewing bot data -# false: web viewer is disabled -enabled = true +# false: web viewer is disabled (RECOMMENDED for production) +enabled = false # Web viewer host address -# 127.0.0.1: Only accessible from localhost -# 0.0.0.0: Accessible from any network interface +# SECURITY WARNING: +# - 127.0.0.1: Only accessible from localhost (SECURE - recommended) +# - 0.0.0.0: Accessible from any network interface (INSECURE - exposes all bot data to network) +# +# Using 0.0.0.0 will expose: +# - All messages and their contents +# - Contact list and repeater information +# - Real-time packet stream +# - Bot configuration details +# +# WITHOUT ANY AUTHENTICATION OR ENCRYPTION host = 127.0.0.1 # Web viewer port -# Default: 8080 (viewer uses port 8080) +# Must be between 1024-65535 (non-privileged ports) +# Default: 8080 port = 8080 # Enable debug mode for the web viewer # true: Enable Flask debug mode (auto-reload on changes) -# false: Production mode +# false: Production mode (recommended) debug = false # Auto-start web viewer with bot # true: Start web viewer automatically when bot starts -# false: Start web viewer manually +# false: Start web viewer manually (recommended) auto_start = false \ No newline at end of file diff --git a/modules/commands/base_command.py b/modules/commands/base_command.py index 5e58e1a..77e5022 100644 --- a/modules/commands/base_command.py +++ b/modules/commands/base_command.py @@ -8,6 +8,7 @@ from abc import ABC, abstractmethod from typing import Optional, List, Dict, Any, Tuple from datetime import datetime import pytz +import re from ..models import MeshMessage @@ -415,51 +416,83 @@ class BaseCommand(ABC): return False def _check_admin_access(self, message: MeshMessage) -> bool: - """Check if the message sender has admin access""" + """ + Check if the message sender has admin access (security-hardened) + + Security features: + - Strict pubkey format validation (64-char hex) + - No fallback to sender_id (prevents spoofing) + - Whitespace/empty config detection + - Normalized comparison (lowercase) + """ + import re # This import is needed for re.match if not hasattr(self.bot, 'config'): return False try: # Get admin pubkeys from config admin_pubkeys = self.bot.config.get('Admin_ACL', 'admin_pubkeys', fallback='') - if not admin_pubkeys: - self.logger.warning("No admin pubkeys configured") + + # Check for empty or whitespace-only configuration + if not admin_pubkeys.strip(): + self.logger.warning("No admin pubkeys configured or empty/whitespace config") return False - # Parse admin pubkeys - admin_pubkey_list = [key.strip() for key in admin_pubkeys.split(',') if key.strip()] + # Parse and VALIDATE admin pubkeys + admin_pubkey_list = [] + for key in admin_pubkeys.split(','): + key = key.strip() + if not key: + continue + + # Validate hex format (64 chars for ed25519 public keys) + if not re.match(r'^[0-9a-fA-F]{64}$', key): + self.logger.error(f"Invalid admin pubkey format in config: {key[:16]}...") + continue # Skip invalid keys but continue checking others + + admin_pubkey_list.append(key.lower()) # Normalize to lowercase + if not admin_pubkey_list: - self.logger.warning("No valid admin pubkeys found in config") + self.logger.error("No valid admin pubkeys found in config after validation") return False - # Get sender's public key from message + # Get sender's public key - NEVER fall back to sender_id sender_pubkey = getattr(message, 'sender_pubkey', None) if not sender_pubkey: - # Try to get from sender_id if it's a pubkey - sender_pubkey = getattr(message, 'sender_id', None) - - if not sender_pubkey: - self.logger.warning(f"No sender public key found for message from {message.sender_id}") + self.logger.warning( + f"No sender public key available for {message.sender_id} - " + "admin access denied (missing pubkey)" + ) return False - # Check if sender's pubkey matches any admin key (exact match required for security) - is_admin = False - for admin_key in admin_pubkey_list: - # Only allow exact matches for security - if sender_pubkey == admin_key: - is_admin = True - break + # Validate sender pubkey format + if not re.match(r'^[0-9a-fA-F]{64}$', sender_pubkey): + self.logger.warning( + f"Invalid sender pubkey format from {message.sender_id}: " + f"{sender_pubkey[:16]}... - admin access denied" + ) + return False + + # Normalize and compare + sender_pubkey_normalized = sender_pubkey.lower() + is_admin = sender_pubkey_normalized in admin_pubkey_list if not is_admin: - self.logger.info(f"Access denied for {message.sender_id} (pubkey: {sender_pubkey[:16]}...) - not in admin ACL") + self.logger.warning( + f"Access denied for {message.sender_id} " + f"(pubkey: {sender_pubkey[:16]}...) - not in admin ACL" + ) else: - self.logger.info(f"Admin access granted for {message.sender_id} (pubkey: {sender_pubkey[:16]}...)") + self.logger.info( + f"Admin access granted for {message.sender_id} " + f"(pubkey: {sender_pubkey[:16]}...)" + ) return is_admin except Exception as e: self.logger.error(f"Error checking admin access: {e}") - return False + return False # Fail securely def _strip_quotes_from_config(self, value: str) -> str: """Strip quotes from config values if present""" diff --git a/modules/core.py b/modules/core.py index 6d4ef80..99fbee4 100644 --- a/modules/core.py +++ b/modules/core.py @@ -35,6 +35,7 @@ from .db_manager import DBManager from .i18n import Translator from .solar_conditions import set_config from .web_viewer.integration import WebViewerIntegration +from .security_utils import validate_safe_path class MeshCoreBot: @@ -57,6 +58,15 @@ class MeshCoreBot: # Initialize database manager first (needed by plugins) db_path = self.config.get('Bot', 'db_path', fallback='meshcore_bot.db') + + # Validate database path for security (prevent path traversal) + try: + db_path = str(validate_safe_path(db_path, base_dir='.', allow_absolute=False)) + except ValueError as e: + self.logger.error(f"Invalid database path: {e}") + self.logger.error("Using default: meshcore_bot.db") + db_path = 'meshcore_bot.db' + self.logger.info(f"Initializing database manager with database: {db_path}") try: self.db_manager = DBManager(self, db_path) @@ -537,6 +547,15 @@ use_zulu_time = false # File handler log_file = self.config.get('Logging', 'log_file', fallback='meshcore_bot.log') + + # Validate log file path for security (prevent path traversal) + try: + log_file = str(validate_safe_path(log_file, base_dir='.', allow_absolute=False)) + except ValueError as e: + self.logger.warning(f"Invalid log file path: {e}") + self.logger.warning("Using default: meshcore_bot.log") + log_file = 'meshcore_bot.log' + file_handler = logging.FileHandler(log_file) file_handler.setFormatter(formatter) self.logger.addHandler(file_handler) diff --git a/modules/db_manager.py b/modules/db_manager.py index 57db433..4b8527f 100644 --- a/modules/db_manager.py +++ b/modules/db_manager.py @@ -6,6 +6,7 @@ Provides common database operations and table management for the MeshCore Bot import sqlite3 import json +import re from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple, Any from pathlib import Path @@ -14,6 +15,21 @@ from pathlib import Path class DBManager: """Generalized database manager for common operations""" + # Whitelist of allowed tables for security + ALLOWED_TABLES = { + 'geocoding_cache', + 'generic_cache', + 'bot_metadata', + 'packet_stream', + 'message_stats', + 'greeted_users', + 'repeater_contacts', + 'repeater_interactions', + 'complete_contact_tracking', # Repeater manager + 'daily_stats', # Repeater manager + 'purging_log', # Repeater manager + } + def __init__(self, bot, db_path: str = "meshcore_bot.db"): self.bot = bot self.logger = bot.logger @@ -94,13 +110,18 @@ class DBManager: def cache_geocoding(self, query: str, latitude: float, longitude: float, cache_hours: int = 720): """Cache geocoding result for future use (default: 30 days)""" try: + # Validate cache_hours to prevent SQL injection + if not isinstance(cache_hours, int) or cache_hours < 1 or cache_hours > 87600: # Max 10 years + raise ValueError(f"cache_hours must be an integer between 1 and 87600, got: {cache_hours}") + with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() + # Use parameter binding instead of string formatting cursor.execute(''' INSERT OR REPLACE INTO geocoding_cache (query, latitude, longitude, expires_at) - VALUES (?, ?, ?, datetime('now', '+{} hours')) - '''.format(cache_hours), (query, latitude, longitude)) + VALUES (?, ?, ?, datetime('now', '+' || ? || ' hours')) + ''', (query, latitude, longitude, cache_hours)) conn.commit() except Exception as e: self.logger.error(f"Error caching geocoding: {e}") @@ -126,13 +147,18 @@ class DBManager: def cache_value(self, cache_key: str, cache_value: str, cache_type: str, cache_hours: int = 24): """Cache a value for future use""" try: + # Validate cache_hours to prevent SQL injection + if not isinstance(cache_hours, int) or cache_hours < 1 or cache_hours > 87600: # Max 10 years + raise ValueError(f"cache_hours must be an integer between 1 and 87600, got: {cache_hours}") + with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() + # Use parameter binding instead of string formatting cursor.execute(''' INSERT OR REPLACE INTO generic_cache (cache_key, cache_value, cache_type, expires_at) - VALUES (?, ?, ?, datetime('now', '+{} hours')) - '''.format(cache_hours), (cache_key, cache_value, cache_type)) + VALUES (?, ?, ?, datetime('now', '+' || ? || ' hours')) + ''', (cache_key, cache_value, cache_type, cache_hours)) conn.commit() except Exception as e: self.logger.error(f"Error caching value: {e}") @@ -240,26 +266,49 @@ class DBManager: # Table management methods def create_table(self, table_name: str, schema: str): - """Create a custom table with the given schema""" + """Create a custom table with the given schema (whitelist-protected)""" try: + # Validate table name against whitelist + if table_name not in self.ALLOWED_TABLES: + raise ValueError(f"Table name '{table_name}' not in allowed tables whitelist") + + # Additional validation: ensure table name follows safe naming convention + if not re.match(r'^[a-z_][a-z0-9_]*$', table_name): + raise ValueError(f"Invalid table name format: {table_name}") + with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() + # Table names cannot be parameterized, but we've validated against whitelist cursor.execute(f'CREATE TABLE IF NOT EXISTS {table_name} ({schema})') conn.commit() self.logger.info(f"Created table: {table_name}") except Exception as e: self.logger.error(f"Error creating table {table_name}: {e}") + raise def drop_table(self, table_name: str): - """Drop a table (use with caution)""" + """Drop a table (whitelist-protected, use with extreme caution)""" try: + # Validate table name against whitelist + if table_name not in self.ALLOWED_TABLES: + raise ValueError(f"Table name '{table_name}' not in allowed tables whitelist") + + # Additional validation: ensure table name follows safe naming convention + if not re.match(r'^[a-z_][a-z0-9_]*$', table_name): + raise ValueError(f"Invalid table name format: {table_name}") + + # Extra safety: log critical action + self.logger.warning(f"CRITICAL: Dropping table '{table_name}'") + with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() + # Table names cannot be parameterized, but we've validated against whitelist cursor.execute(f'DROP TABLE IF EXISTS {table_name}') conn.commit() self.logger.info(f"Dropped table: {table_name}") except Exception as e: self.logger.error(f"Error dropping table {table_name}: {e}") + raise def execute_query(self, query: str, params: Tuple = ()) -> List[Dict]: """Execute a custom query and return results as list of dictionaries""" diff --git a/modules/message_handler.py b/modules/message_handler.py index 2aed5ab..68a43f4 100644 --- a/modules/message_handler.py +++ b/modules/message_handler.py @@ -14,6 +14,7 @@ from meshcore import EventType from .models import MeshMessage from .enums import PayloadType, PayloadVersion, RouteType, AdvertFlags, DeviceRole from .utils import calculate_packet_hash +from .security_utils import sanitize_input class MessageHandler: @@ -251,28 +252,34 @@ class MessageHandler: # Look up contact name from pubkey prefix sender_id = payload.get('pubkey_prefix', '') + sender_name = sender_id # Default to sender_id if hasattr(self.bot.meshcore, 'contacts') and self.bot.meshcore.contacts: for contact_key, contact_data in self.bot.meshcore.contacts.items(): if contact_data.get('public_key', '').startswith(sender_id): # Use the contact name if available, otherwise use adv_name contact_name = contact_data.get('name', contact_data.get('adv_name', sender_id)) - sender_id = contact_name + sender_name = contact_name break # Get the full public key from contacts if available sender_pubkey = payload.get('pubkey_prefix', '') + sender_pubkey = sender_id # Default to sender_id if hasattr(self.bot.meshcore, 'contacts') and self.bot.meshcore.contacts: for contact_key, contact_data in self.bot.meshcore.contacts.items(): - if contact_data.get('public_key', '').startswith(sender_pubkey): + if contact_data.get('public_key', '').startswith(sender_id): # Use the full public key from the contact - sender_pubkey = contact_data.get('public_key', sender_pubkey) - self.logger.debug(f"Found full public key for {sender_id}: {sender_pubkey[:16]}...") + sender_pubkey = contact_data.get('public_key', sender_id) + self.logger.debug(f"Found full public key for {sender_name}: {sender_pubkey[:16]}...") break + # Sanitize message content to prevent injection attacks + message_content = payload.get('text', '') + message_content = sanitize_input(message_content, max_length=500, strip_controls=True) + # Convert to our message format message = MeshMessage( - content=payload.get('text', ''), - sender_id=sender_id, + content=message_content, + sender_id=sender_name, sender_pubkey=sender_pubkey, is_dm=True, timestamp=timestamp, diff --git a/modules/security_utils.py b/modules/security_utils.py new file mode 100644 index 0000000..6caf703 --- /dev/null +++ b/modules/security_utils.py @@ -0,0 +1,257 @@ +#!/usr/bin/env python3 +""" +Security Utilities for MeshCore Bot +Provides centralized security validation functions to prevent common attacks +""" + +import re +import ipaddress +import socket +from pathlib import Path +from typing import Optional +from urllib.parse import urlparse +import logging + +logger = logging.getLogger('MeshCoreBot.Security') + + +def validate_external_url(url: str, allow_localhost: bool = False) -> bool: + """ + Validate that URL points to safe external resource (SSRF protection) + + Args: + url: URL to validate + allow_localhost: Whether to allow localhost/private IPs (default: False) + + Returns: + True if URL is safe, False otherwise + + Raises: + ValueError: If URL is invalid or unsafe + """ + try: + parsed = urlparse(url) + + # Only allow HTTP/HTTPS + if parsed.scheme not in ['http', 'https']: + logger.warning(f"URL scheme not allowed: {parsed.scheme}") + return False + + # Reject file:// and other dangerous schemes + if not parsed.netloc: + logger.warning(f"URL missing network location: {url}") + return False + + # Resolve and check if IP is internal/private + try: + ip = socket.gethostbyname(parsed.hostname) + ip_obj = ipaddress.ip_address(ip) + + # If localhost is not allowed, reject private/internal IPs + if not allow_localhost: + # Reject private/internal IPs + if ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_link_local: + logger.warning(f"URL resolves to private/internal IP: {ip}") + return False + + # Reject reserved ranges + if ip_obj.is_reserved or ip_obj.is_multicast: + logger.warning(f"URL resolves to reserved/multicast IP: {ip}") + return False + + except socket.gaierror as e: + logger.warning(f"Failed to resolve hostname {parsed.hostname}: {e}") + return False + + return True + + except Exception as e: + logger.error(f"URL validation failed: {e}") + return False + + +def validate_safe_path(file_path: str, base_dir: str = '.', allow_absolute: bool = False) -> Path: + """ + Validate that path is safe and within base directory (path traversal protection) + + Args: + file_path: Path to validate + base_dir: Base directory that path must be within (default: current dir) + allow_absolute: Whether to allow absolute paths outside base_dir + + Returns: + Resolved Path object if safe + + Raises: + ValueError: If path is unsafe or attempts traversal + """ + try: + # Resolve absolute paths + base = Path(base_dir).resolve() + target = Path(file_path).resolve() + + # If absolute paths are not allowed, ensure target is within base + if not allow_absolute: + # Check if target is within base directory + try: + target.relative_to(base) + except ValueError: + raise ValueError( + f"Path traversal detected: {file_path} is not within {base_dir}" + ) + + # Reject certain dangerous system paths + dangerous_prefixes = ['/etc', '/sys', '/proc', '/dev', '/bin', '/sbin', '/boot'] + target_str = str(target) + if any(target_str.startswith(prefix) for prefix in dangerous_prefixes): + raise ValueError(f"Access to system directory denied: {file_path}") + + return target + + except Exception as e: + raise ValueError(f"Invalid or unsafe file path: {file_path} - {e}") + + +def sanitize_input(content: str, max_length: int = 500, strip_controls: bool = True) -> str: + """ + Sanitize user input to prevent injection attacks + + Args: + content: Input string to sanitize + max_length: Maximum allowed length (default: 500 chars) + strip_controls: Whether to remove control characters (default: True) + + Returns: + Sanitized string + """ + if not isinstance(content, str): + content = str(content) + + # Limit length to prevent DoS + if len(content) > max_length: + content = content[:max_length] + logger.debug(f"Input truncated to {max_length} characters") + + # Remove control characters except newline, carriage return, tab + if strip_controls: + # Keep only printable characters plus common whitespace + content = ''.join( + char for char in content + if ord(char) >= 32 or char in '\n\r\t' + ) + + # Remove null bytes (can cause issues in C libraries) + content = content.replace('\x00', '') + + return content.strip() + + +def validate_api_key_format(api_key: str, min_length: int = 16) -> bool: + """ + Validate API key format + + Args: + api_key: API key to validate + min_length: Minimum required length (default: 16) + + Returns: + True if format is valid, False otherwise + """ + if not isinstance(api_key, str): + return False + + # Check minimum length + if len(api_key) < min_length: + return False + + # Check for obviously invalid patterns + invalid_patterns = [ + 'your_api_key_here', + 'placeholder', + 'example', + 'test_key', + '12345', + 'aaaa', + ] + + api_key_lower = api_key.lower() + if any(pattern in api_key_lower for pattern in invalid_patterns): + return False + + # Check that it's not all the same character + if len(set(api_key)) < 3: + return False + + return True + + +def validate_pubkey_format(pubkey: str, expected_length: int = 64) -> bool: + """ + Validate public key format (hex string) + + Args: + pubkey: Public key to validate + expected_length: Expected length in characters (default: 64 for ed25519) + + Returns: + True if format is valid, False otherwise + """ + if not isinstance(pubkey, str): + return False + + # Check exact length + if len(pubkey) != expected_length: + return False + + # Check hex format + if not re.match(r'^[0-9a-fA-F]+$', pubkey): + return False + + return True + + +def validate_port_number(port: int, allow_privileged: bool = False) -> bool: + """ + Validate port number + + Args: + port: Port number to validate + allow_privileged: Whether to allow privileged ports <1024 (default: False) + + Returns: + True if port is valid, False otherwise + """ + if not isinstance(port, int): + return False + + min_port = 1 if allow_privileged else 1024 + max_port = 65535 + + return min_port <= port <= max_port + + +def validate_integer_range(value: int, min_value: int, max_value: int, name: str = "value") -> bool: + """ + Validate integer is within range + + Args: + value: Integer to validate + min_value: Minimum allowed value (inclusive) + max_value: Maximum allowed value (inclusive) + name: Name of the value for error messages + + Returns: + True if valid + + Raises: + ValueError: If value is out of range + """ + if not isinstance(value, int): + raise ValueError(f"{name} must be an integer, got {type(value).__name__}") + + if value < min_value or value > max_value: + raise ValueError( + f"{name} must be between {min_value} and {max_value}, got {value}" + ) + + return True diff --git a/modules/web_viewer/integration.py b/modules/web_viewer/integration.py index e8ec6d1..0191b33 100644 --- a/modules/web_viewer/integration.py +++ b/modules/web_viewer/integration.py @@ -9,6 +9,7 @@ import time import subprocess import sys import os +import re from pathlib import Path class BotIntegration: @@ -247,6 +248,9 @@ class BotIntegration: class WebViewerIntegration: """Integration class for starting/stopping the web viewer with the bot""" + # Whitelist of allowed host bindings for security + ALLOWED_HOSTS = ['127.0.0.1', 'localhost', '0.0.0.0'] + def __init__(self, bot): self.bot = bot self.logger = bot.logger @@ -261,6 +265,9 @@ class WebViewerIntegration: self.debug = bot.config.getboolean('Web_Viewer', 'debug', fallback=False) self.auto_start = bot.config.getboolean('Web_Viewer', 'auto_start', fallback=False) + # Validate configuration for security + self._validate_config() + # Process monitoring self.restart_count = 0 self.max_restarts = 5 @@ -272,6 +279,32 @@ class WebViewerIntegration: if self.enabled and self.auto_start: self.start_viewer() + def _validate_config(self): + """Validate web viewer configuration for security""" + # Validate host against whitelist + if self.host not in self.ALLOWED_HOSTS: + raise ValueError( + f"Invalid host configuration: {self.host}. " + f"Allowed hosts: {', '.join(self.ALLOWED_HOSTS)}" + ) + + # Validate port range (avoid privileged ports) + if not isinstance(self.port, int) or not (1024 <= self.port <= 65535): + raise ValueError( + f"Port must be between 1024-65535 (non-privileged), got: {self.port}" + ) + + # Security warning for network exposure + if self.host == '0.0.0.0': + self.logger.warning( + "\n" + "="*70 + "\n" + "⚠️ SECURITY WARNING: Web viewer binding to all interfaces (0.0.0.0)\n" + "This exposes bot data (messages, contacts, routing) to your network\n" + "WITHOUT AUTHENTICATION. Ensure you have firewall protection!\n" + "For local-only access, use host=127.0.0.1 in config.\n" + + "="*70 + ) + def start_viewer(self): """Start the web viewer in a separate thread""" if self.running: @@ -327,12 +360,26 @@ class WebViewerIntegration: if result.returncode == 0 and result.stdout.strip(): pids = result.stdout.strip().split('\n') for pid in pids: - if pid.strip(): - try: - subprocess.run(['kill', '-9', pid.strip()], timeout=2) - self.logger.info(f"Killed remaining process {pid} on port {self.port}") - except Exception as e: - self.logger.warning(f"Failed to kill process {pid}: {e}") + pid = pid.strip() + if not pid: + continue + + # Validate PID is numeric only (prevent injection) + if not re.match(r'^\d+$', pid): + self.logger.warning(f"Invalid PID format: {pid}, skipping") + continue + + try: + pid_int = int(pid) + # Safety check: never kill system PIDs + if pid_int < 2: + self.logger.warning(f"Refusing to kill system PID: {pid}") + continue + + subprocess.run(['kill', '-9', str(pid_int)], timeout=2) + self.logger.info(f"Killed remaining process {pid} on port {self.port}") + except (ValueError, subprocess.TimeoutExpired) as e: + self.logger.warning(f"Failed to kill process {pid}: {e}") except Exception as e: self.logger.debug(f"Port cleanup check failed: {e}") From 3f8ae492f16a5995d50b7991135689db831fc0c5 Mon Sep 17 00:00:00 2001 From: Edilson Osorio Jr <3277320+eddieoz@users.noreply.github.com> Date: Tue, 9 Dec 2025 17:26:59 +0200 Subject: [PATCH 2/3] Fix duplicate entry in admin_command --- config.ini.example | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/config.ini.example b/config.ini.example index 3953276..8ba4871 100644 --- a/config.ini.example +++ b/config.ini.example @@ -141,7 +141,7 @@ admin_pubkeys = # Commands that require admin access (comma-separated) # These commands will only work for users in the admin_pubkeys list -admin_commands = repeater,webviewer,webviewer +admin_commands = repeater,webviewer [Plugin_Overrides] # Plugin Overrides - Use alternative plugin implementations @@ -676,4 +676,4 @@ debug = false # Auto-start web viewer with bot # true: Start web viewer automatically when bot starts # false: Start web viewer manually (recommended) -auto_start = false \ No newline at end of file +auto_start = false From d80d808f02693bcaa02c82fbddd095e473df939c Mon Sep 17 00:00:00 2001 From: agessaman Date: Tue, 9 Dec 2025 19:58:42 -0800 Subject: [PATCH 3/3] fixes: add DNS timeout, removed duplicate import, added OS-aware path validation, used centralized validation function for pubkey, made sanitize input length limit configurable, used explicit base directory, added a few type hints. --- modules/commands/base_command.py | 9 ++-- modules/core.py | 11 ++++- modules/message_handler.py | 4 +- modules/security_utils.py | 76 ++++++++++++++++++++++++++------ 4 files changed, 79 insertions(+), 21 deletions(-) diff --git a/modules/commands/base_command.py b/modules/commands/base_command.py index 77e5022..bede7d0 100644 --- a/modules/commands/base_command.py +++ b/modules/commands/base_command.py @@ -10,6 +10,7 @@ from datetime import datetime import pytz import re from ..models import MeshMessage +from ..security_utils import validate_pubkey_format class BaseCommand(ABC): @@ -417,15 +418,15 @@ class BaseCommand(ABC): def _check_admin_access(self, message: MeshMessage) -> bool: """ - Check if the message sender has admin access (security-hardened) + Check if the message sender has admin access Security features: - Strict pubkey format validation (64-char hex) - No fallback to sender_id (prevents spoofing) - Whitespace/empty config detection - Normalized comparison (lowercase) + - Uses centralized validate_pubkey_format() function """ - import re # This import is needed for re.match if not hasattr(self.bot, 'config'): return False @@ -446,7 +447,7 @@ class BaseCommand(ABC): continue # Validate hex format (64 chars for ed25519 public keys) - if not re.match(r'^[0-9a-fA-F]{64}$', key): + if not validate_pubkey_format(key, expected_length=64): self.logger.error(f"Invalid admin pubkey format in config: {key[:16]}...") continue # Skip invalid keys but continue checking others @@ -466,7 +467,7 @@ class BaseCommand(ABC): return False # Validate sender pubkey format - if not re.match(r'^[0-9a-fA-F]{64}$', sender_pubkey): + if not validate_pubkey_format(sender_pubkey, expected_length=64): self.logger.warning( f"Invalid sender pubkey format from {message.sender_id}: " f"{sender_pubkey[:16]}... - admin access denied" diff --git a/modules/core.py b/modules/core.py index 99fbee4..96d7858 100644 --- a/modules/core.py +++ b/modules/core.py @@ -45,6 +45,11 @@ class MeshCoreBot: self.config_file = config_file self.config = configparser.ConfigParser() self.load_config() + + @property + def bot_root(self) -> Path: + """Get bot root directory (where config.ini is located)""" + return Path(self.config_file).parent.resolve() # Setup logging self.setup_logging() @@ -60,8 +65,9 @@ class MeshCoreBot: db_path = self.config.get('Bot', 'db_path', fallback='meshcore_bot.db') # Validate database path for security (prevent path traversal) + # Use explicit bot root directory (where config.ini is located) try: - db_path = str(validate_safe_path(db_path, base_dir='.', allow_absolute=False)) + db_path = str(validate_safe_path(db_path, base_dir=str(self.bot_root), allow_absolute=False)) except ValueError as e: self.logger.error(f"Invalid database path: {e}") self.logger.error("Using default: meshcore_bot.db") @@ -549,8 +555,9 @@ use_zulu_time = false log_file = self.config.get('Logging', 'log_file', fallback='meshcore_bot.log') # Validate log file path for security (prevent path traversal) + # Use explicit bot root directory (where config.ini is located) try: - log_file = str(validate_safe_path(log_file, base_dir='.', allow_absolute=False)) + log_file = str(validate_safe_path(log_file, base_dir=str(self.bot_root), allow_absolute=False)) except ValueError as e: self.logger.warning(f"Invalid log file path: {e}") self.logger.warning("Using default: meshcore_bot.log") diff --git a/modules/message_handler.py b/modules/message_handler.py index 68a43f4..886aad5 100644 --- a/modules/message_handler.py +++ b/modules/message_handler.py @@ -273,8 +273,10 @@ class MessageHandler: break # Sanitize message content to prevent injection attacks + # Note: Firmware enforces 150-char limit at hardware level, so we disable length check + # but still strip control characters for security message_content = payload.get('text', '') - message_content = sanitize_input(message_content, max_length=500, strip_controls=True) + message_content = sanitize_input(message_content, max_length=None, strip_controls=True) # Convert to our message format message = MeshMessage( diff --git a/modules/security_utils.py b/modules/security_utils.py index 6caf703..08a50ea 100644 --- a/modules/security_utils.py +++ b/modules/security_utils.py @@ -7,6 +7,7 @@ Provides centralized security validation functions to prevent common attacks import re import ipaddress import socket +import platform from pathlib import Path from typing import Optional from urllib.parse import urlparse @@ -15,13 +16,14 @@ import logging logger = logging.getLogger('MeshCoreBot.Security') -def validate_external_url(url: str, allow_localhost: bool = False) -> bool: +def validate_external_url(url: str, allow_localhost: bool = False, timeout: float = 2.0) -> bool: """ Validate that URL points to safe external resource (SSRF protection) Args: url: URL to validate allow_localhost: Whether to allow localhost/private IPs (default: False) + timeout: DNS resolution timeout in seconds (default: 2.0) Returns: True if URL is safe, False otherwise @@ -42,9 +44,18 @@ def validate_external_url(url: str, allow_localhost: bool = False) -> bool: logger.warning(f"URL missing network location: {url}") return False - # Resolve and check if IP is internal/private + # Resolve and check if IP is internal/private (with timeout) try: - ip = socket.gethostbyname(parsed.hostname) + # Set socket timeout for DNS resolution + # Note: getdefaulttimeout() can return None (no timeout), which is valid + old_timeout = socket.getdefaulttimeout() + socket.setdefaulttimeout(timeout) + try: + ip = socket.gethostbyname(parsed.hostname) + finally: + # Restore original timeout (None means no timeout, which is correct) + socket.setdefaulttimeout(old_timeout) + ip_obj = ipaddress.ip_address(ip) # If localhost is not allowed, reject private/internal IPs @@ -62,6 +73,9 @@ def validate_external_url(url: str, allow_localhost: bool = False) -> bool: except socket.gaierror as e: logger.warning(f"Failed to resolve hostname {parsed.hostname}: {e}") return False + except socket.timeout: + logger.warning(f"DNS resolution timeout for {parsed.hostname}") + return False return True @@ -100,10 +114,37 @@ def validate_safe_path(file_path: str, base_dir: str = '.', allow_absolute: bool f"Path traversal detected: {file_path} is not within {base_dir}" ) - # Reject certain dangerous system paths - dangerous_prefixes = ['/etc', '/sys', '/proc', '/dev', '/bin', '/sbin', '/boot'] - target_str = str(target) - if any(target_str.startswith(prefix) for prefix in dangerous_prefixes): + # Reject certain dangerous system paths (OS-specific) + system = platform.system() + if system == 'Windows': + dangerous_prefixes = [ + 'C:\\Windows\\System32', + 'C:\\Windows\\SysWOW64', + 'C:\\Program Files', + 'C:\\ProgramData', + 'C:\\Windows\\System', + ] + # Check against both forward and backslash paths + target_str = str(target).lower() + dangerous = any(target_str.startswith(prefix.lower()) for prefix in dangerous_prefixes) + elif system == 'Darwin': # macOS + dangerous_prefixes = [ + '/System', + '/Library', + '/private', + '/usr/bin', + '/usr/sbin', + '/sbin', + '/bin', + ] + target_str = str(target) + dangerous = any(target_str.startswith(prefix) for prefix in dangerous_prefixes) + else: # Linux and other Unix-like systems + dangerous_prefixes = ['/etc', '/sys', '/proc', '/dev', '/bin', '/sbin', '/boot'] + target_str = str(target) + dangerous = any(target_str.startswith(prefix) for prefix in dangerous_prefixes) + + if dangerous: raise ValueError(f"Access to system directory denied: {file_path}") return target @@ -112,25 +153,32 @@ def validate_safe_path(file_path: str, base_dir: str = '.', allow_absolute: bool raise ValueError(f"Invalid or unsafe file path: {file_path} - {e}") -def sanitize_input(content: str, max_length: int = 500, strip_controls: bool = True) -> str: +def sanitize_input(content: str, max_length: Optional[int] = 500, strip_controls: bool = True) -> str: """ Sanitize user input to prevent injection attacks Args: content: Input string to sanitize - max_length: Maximum allowed length (default: 500 chars) + max_length: Maximum allowed length (default: 500 chars, None to disable length check) strip_controls: Whether to remove control characters (default: True) Returns: Sanitized string + + Raises: + ValueError: If max_length is negative """ if not isinstance(content, str): content = str(content) - # Limit length to prevent DoS - if len(content) > max_length: - content = content[:max_length] - logger.debug(f"Input truncated to {max_length} characters") + # Validate max_length if provided + if max_length is not None: + if max_length < 0: + raise ValueError(f"max_length must be non-negative, got {max_length}") + # Limit length to prevent DoS + if len(content) > max_length: + content = content[:max_length] + logger.debug(f"Input truncated to {max_length} characters") # Remove control characters except newline, carriage return, tab if strip_controls: @@ -254,4 +302,4 @@ def validate_integer_range(value: int, min_value: int, max_value: int, name: str f"{name} must be between {min_value} and {max_value}, got {value}" ) - return True + return True \ No newline at end of file