From d80d808f02693bcaa02c82fbddd095e473df939c Mon Sep 17 00:00:00 2001 From: agessaman Date: Tue, 9 Dec 2025 19:58:42 -0800 Subject: [PATCH] fixes: add DNS timeout, removed duplicate import, added OS-aware path validation, used centralized validation function for pubkey, made sanitize input length limit configurable, used explicit base directory, added a few type hints. --- modules/commands/base_command.py | 9 ++-- modules/core.py | 11 ++++- modules/message_handler.py | 4 +- modules/security_utils.py | 76 ++++++++++++++++++++++++++------ 4 files changed, 79 insertions(+), 21 deletions(-) diff --git a/modules/commands/base_command.py b/modules/commands/base_command.py index 77e5022..bede7d0 100644 --- a/modules/commands/base_command.py +++ b/modules/commands/base_command.py @@ -10,6 +10,7 @@ from datetime import datetime import pytz import re from ..models import MeshMessage +from ..security_utils import validate_pubkey_format class BaseCommand(ABC): @@ -417,15 +418,15 @@ class BaseCommand(ABC): def _check_admin_access(self, message: MeshMessage) -> bool: """ - Check if the message sender has admin access (security-hardened) + Check if the message sender has admin access Security features: - Strict pubkey format validation (64-char hex) - No fallback to sender_id (prevents spoofing) - Whitespace/empty config detection - Normalized comparison (lowercase) + - Uses centralized validate_pubkey_format() function """ - import re # This import is needed for re.match if not hasattr(self.bot, 'config'): return False @@ -446,7 +447,7 @@ class BaseCommand(ABC): continue # Validate hex format (64 chars for ed25519 public keys) - if not re.match(r'^[0-9a-fA-F]{64}$', key): + if not validate_pubkey_format(key, expected_length=64): self.logger.error(f"Invalid admin pubkey format in config: {key[:16]}...") continue # Skip invalid keys but continue checking others @@ -466,7 +467,7 @@ class BaseCommand(ABC): return False # Validate sender pubkey format - if not re.match(r'^[0-9a-fA-F]{64}$', sender_pubkey): + if not validate_pubkey_format(sender_pubkey, expected_length=64): self.logger.warning( f"Invalid sender pubkey format from {message.sender_id}: " f"{sender_pubkey[:16]}... - admin access denied" diff --git a/modules/core.py b/modules/core.py index 99fbee4..96d7858 100644 --- a/modules/core.py +++ b/modules/core.py @@ -45,6 +45,11 @@ class MeshCoreBot: self.config_file = config_file self.config = configparser.ConfigParser() self.load_config() + + @property + def bot_root(self) -> Path: + """Get bot root directory (where config.ini is located)""" + return Path(self.config_file).parent.resolve() # Setup logging self.setup_logging() @@ -60,8 +65,9 @@ class MeshCoreBot: db_path = self.config.get('Bot', 'db_path', fallback='meshcore_bot.db') # Validate database path for security (prevent path traversal) + # Use explicit bot root directory (where config.ini is located) try: - db_path = str(validate_safe_path(db_path, base_dir='.', allow_absolute=False)) + db_path = str(validate_safe_path(db_path, base_dir=str(self.bot_root), allow_absolute=False)) except ValueError as e: self.logger.error(f"Invalid database path: {e}") self.logger.error("Using default: meshcore_bot.db") @@ -549,8 +555,9 @@ use_zulu_time = false log_file = self.config.get('Logging', 'log_file', fallback='meshcore_bot.log') # Validate log file path for security (prevent path traversal) + # Use explicit bot root directory (where config.ini is located) try: - log_file = str(validate_safe_path(log_file, base_dir='.', allow_absolute=False)) + log_file = str(validate_safe_path(log_file, base_dir=str(self.bot_root), allow_absolute=False)) except ValueError as e: self.logger.warning(f"Invalid log file path: {e}") self.logger.warning("Using default: meshcore_bot.log") diff --git a/modules/message_handler.py b/modules/message_handler.py index 68a43f4..886aad5 100644 --- a/modules/message_handler.py +++ b/modules/message_handler.py @@ -273,8 +273,10 @@ class MessageHandler: break # Sanitize message content to prevent injection attacks + # Note: Firmware enforces 150-char limit at hardware level, so we disable length check + # but still strip control characters for security message_content = payload.get('text', '') - message_content = sanitize_input(message_content, max_length=500, strip_controls=True) + message_content = sanitize_input(message_content, max_length=None, strip_controls=True) # Convert to our message format message = MeshMessage( diff --git a/modules/security_utils.py b/modules/security_utils.py index 6caf703..08a50ea 100644 --- a/modules/security_utils.py +++ b/modules/security_utils.py @@ -7,6 +7,7 @@ Provides centralized security validation functions to prevent common attacks import re import ipaddress import socket +import platform from pathlib import Path from typing import Optional from urllib.parse import urlparse @@ -15,13 +16,14 @@ import logging logger = logging.getLogger('MeshCoreBot.Security') -def validate_external_url(url: str, allow_localhost: bool = False) -> bool: +def validate_external_url(url: str, allow_localhost: bool = False, timeout: float = 2.0) -> bool: """ Validate that URL points to safe external resource (SSRF protection) Args: url: URL to validate allow_localhost: Whether to allow localhost/private IPs (default: False) + timeout: DNS resolution timeout in seconds (default: 2.0) Returns: True if URL is safe, False otherwise @@ -42,9 +44,18 @@ def validate_external_url(url: str, allow_localhost: bool = False) -> bool: logger.warning(f"URL missing network location: {url}") return False - # Resolve and check if IP is internal/private + # Resolve and check if IP is internal/private (with timeout) try: - ip = socket.gethostbyname(parsed.hostname) + # Set socket timeout for DNS resolution + # Note: getdefaulttimeout() can return None (no timeout), which is valid + old_timeout = socket.getdefaulttimeout() + socket.setdefaulttimeout(timeout) + try: + ip = socket.gethostbyname(parsed.hostname) + finally: + # Restore original timeout (None means no timeout, which is correct) + socket.setdefaulttimeout(old_timeout) + ip_obj = ipaddress.ip_address(ip) # If localhost is not allowed, reject private/internal IPs @@ -62,6 +73,9 @@ def validate_external_url(url: str, allow_localhost: bool = False) -> bool: except socket.gaierror as e: logger.warning(f"Failed to resolve hostname {parsed.hostname}: {e}") return False + except socket.timeout: + logger.warning(f"DNS resolution timeout for {parsed.hostname}") + return False return True @@ -100,10 +114,37 @@ def validate_safe_path(file_path: str, base_dir: str = '.', allow_absolute: bool f"Path traversal detected: {file_path} is not within {base_dir}" ) - # Reject certain dangerous system paths - dangerous_prefixes = ['/etc', '/sys', '/proc', '/dev', '/bin', '/sbin', '/boot'] - target_str = str(target) - if any(target_str.startswith(prefix) for prefix in dangerous_prefixes): + # Reject certain dangerous system paths (OS-specific) + system = platform.system() + if system == 'Windows': + dangerous_prefixes = [ + 'C:\\Windows\\System32', + 'C:\\Windows\\SysWOW64', + 'C:\\Program Files', + 'C:\\ProgramData', + 'C:\\Windows\\System', + ] + # Check against both forward and backslash paths + target_str = str(target).lower() + dangerous = any(target_str.startswith(prefix.lower()) for prefix in dangerous_prefixes) + elif system == 'Darwin': # macOS + dangerous_prefixes = [ + '/System', + '/Library', + '/private', + '/usr/bin', + '/usr/sbin', + '/sbin', + '/bin', + ] + target_str = str(target) + dangerous = any(target_str.startswith(prefix) for prefix in dangerous_prefixes) + else: # Linux and other Unix-like systems + dangerous_prefixes = ['/etc', '/sys', '/proc', '/dev', '/bin', '/sbin', '/boot'] + target_str = str(target) + dangerous = any(target_str.startswith(prefix) for prefix in dangerous_prefixes) + + if dangerous: raise ValueError(f"Access to system directory denied: {file_path}") return target @@ -112,25 +153,32 @@ def validate_safe_path(file_path: str, base_dir: str = '.', allow_absolute: bool raise ValueError(f"Invalid or unsafe file path: {file_path} - {e}") -def sanitize_input(content: str, max_length: int = 500, strip_controls: bool = True) -> str: +def sanitize_input(content: str, max_length: Optional[int] = 500, strip_controls: bool = True) -> str: """ Sanitize user input to prevent injection attacks Args: content: Input string to sanitize - max_length: Maximum allowed length (default: 500 chars) + max_length: Maximum allowed length (default: 500 chars, None to disable length check) strip_controls: Whether to remove control characters (default: True) Returns: Sanitized string + + Raises: + ValueError: If max_length is negative """ if not isinstance(content, str): content = str(content) - # Limit length to prevent DoS - if len(content) > max_length: - content = content[:max_length] - logger.debug(f"Input truncated to {max_length} characters") + # Validate max_length if provided + if max_length is not None: + if max_length < 0: + raise ValueError(f"max_length must be non-negative, got {max_length}") + # Limit length to prevent DoS + if len(content) > max_length: + content = content[:max_length] + logger.debug(f"Input truncated to {max_length} characters") # Remove control characters except newline, carriage return, tab if strip_controls: @@ -254,4 +302,4 @@ def validate_integer_range(value: int, min_value: int, max_value: int, name: str f"{name} must be between {min_value} and {max_value}, got {value}" ) - return True + return True \ No newline at end of file