fixes: add DNS timeout, removed duplicate import, added OS-aware path validation, used centralized validation function for pubkey, made sanitize input length limit configurable, used explicit base directory, added a few type hints.

This commit is contained in:
agessaman
2025-12-09 19:58:42 -08:00
parent 3f8ae492f1
commit d80d808f02
4 changed files with 79 additions and 21 deletions
+5 -4
View File
@@ -10,6 +10,7 @@ from datetime import datetime
import pytz
import re
from ..models import MeshMessage
from ..security_utils import validate_pubkey_format
class BaseCommand(ABC):
@@ -417,15 +418,15 @@ class BaseCommand(ABC):
def _check_admin_access(self, message: MeshMessage) -> bool:
"""
Check if the message sender has admin access (security-hardened)
Check if the message sender has admin access
Security features:
- Strict pubkey format validation (64-char hex)
- No fallback to sender_id (prevents spoofing)
- Whitespace/empty config detection
- Normalized comparison (lowercase)
- Uses centralized validate_pubkey_format() function
"""
import re # This import is needed for re.match
if not hasattr(self.bot, 'config'):
return False
@@ -446,7 +447,7 @@ class BaseCommand(ABC):
continue
# Validate hex format (64 chars for ed25519 public keys)
if not re.match(r'^[0-9a-fA-F]{64}$', key):
if not validate_pubkey_format(key, expected_length=64):
self.logger.error(f"Invalid admin pubkey format in config: {key[:16]}...")
continue # Skip invalid keys but continue checking others
@@ -466,7 +467,7 @@ class BaseCommand(ABC):
return False
# Validate sender pubkey format
if not re.match(r'^[0-9a-fA-F]{64}$', sender_pubkey):
if not validate_pubkey_format(sender_pubkey, expected_length=64):
self.logger.warning(
f"Invalid sender pubkey format from {message.sender_id}: "
f"{sender_pubkey[:16]}... - admin access denied"
+9 -2
View File
@@ -45,6 +45,11 @@ class MeshCoreBot:
self.config_file = config_file
self.config = configparser.ConfigParser()
self.load_config()
@property
def bot_root(self) -> Path:
"""Get bot root directory (where config.ini is located)"""
return Path(self.config_file).parent.resolve()
# Setup logging
self.setup_logging()
@@ -60,8 +65,9 @@ class MeshCoreBot:
db_path = self.config.get('Bot', 'db_path', fallback='meshcore_bot.db')
# Validate database path for security (prevent path traversal)
# Use explicit bot root directory (where config.ini is located)
try:
db_path = str(validate_safe_path(db_path, base_dir='.', allow_absolute=False))
db_path = str(validate_safe_path(db_path, base_dir=str(self.bot_root), allow_absolute=False))
except ValueError as e:
self.logger.error(f"Invalid database path: {e}")
self.logger.error("Using default: meshcore_bot.db")
@@ -549,8 +555,9 @@ use_zulu_time = false
log_file = self.config.get('Logging', 'log_file', fallback='meshcore_bot.log')
# Validate log file path for security (prevent path traversal)
# Use explicit bot root directory (where config.ini is located)
try:
log_file = str(validate_safe_path(log_file, base_dir='.', allow_absolute=False))
log_file = str(validate_safe_path(log_file, base_dir=str(self.bot_root), allow_absolute=False))
except ValueError as e:
self.logger.warning(f"Invalid log file path: {e}")
self.logger.warning("Using default: meshcore_bot.log")
+3 -1
View File
@@ -273,8 +273,10 @@ class MessageHandler:
break
# Sanitize message content to prevent injection attacks
# Note: Firmware enforces 150-char limit at hardware level, so we disable length check
# but still strip control characters for security
message_content = payload.get('text', '')
message_content = sanitize_input(message_content, max_length=500, strip_controls=True)
message_content = sanitize_input(message_content, max_length=None, strip_controls=True)
# Convert to our message format
message = MeshMessage(
+62 -14
View File
@@ -7,6 +7,7 @@ Provides centralized security validation functions to prevent common attacks
import re
import ipaddress
import socket
import platform
from pathlib import Path
from typing import Optional
from urllib.parse import urlparse
@@ -15,13 +16,14 @@ import logging
logger = logging.getLogger('MeshCoreBot.Security')
def validate_external_url(url: str, allow_localhost: bool = False) -> bool:
def validate_external_url(url: str, allow_localhost: bool = False, timeout: float = 2.0) -> bool:
"""
Validate that URL points to safe external resource (SSRF protection)
Args:
url: URL to validate
allow_localhost: Whether to allow localhost/private IPs (default: False)
timeout: DNS resolution timeout in seconds (default: 2.0)
Returns:
True if URL is safe, False otherwise
@@ -42,9 +44,18 @@ def validate_external_url(url: str, allow_localhost: bool = False) -> bool:
logger.warning(f"URL missing network location: {url}")
return False
# Resolve and check if IP is internal/private
# Resolve and check if IP is internal/private (with timeout)
try:
ip = socket.gethostbyname(parsed.hostname)
# Set socket timeout for DNS resolution
# Note: getdefaulttimeout() can return None (no timeout), which is valid
old_timeout = socket.getdefaulttimeout()
socket.setdefaulttimeout(timeout)
try:
ip = socket.gethostbyname(parsed.hostname)
finally:
# Restore original timeout (None means no timeout, which is correct)
socket.setdefaulttimeout(old_timeout)
ip_obj = ipaddress.ip_address(ip)
# If localhost is not allowed, reject private/internal IPs
@@ -62,6 +73,9 @@ def validate_external_url(url: str, allow_localhost: bool = False) -> bool:
except socket.gaierror as e:
logger.warning(f"Failed to resolve hostname {parsed.hostname}: {e}")
return False
except socket.timeout:
logger.warning(f"DNS resolution timeout for {parsed.hostname}")
return False
return True
@@ -100,10 +114,37 @@ def validate_safe_path(file_path: str, base_dir: str = '.', allow_absolute: bool
f"Path traversal detected: {file_path} is not within {base_dir}"
)
# Reject certain dangerous system paths
dangerous_prefixes = ['/etc', '/sys', '/proc', '/dev', '/bin', '/sbin', '/boot']
target_str = str(target)
if any(target_str.startswith(prefix) for prefix in dangerous_prefixes):
# Reject certain dangerous system paths (OS-specific)
system = platform.system()
if system == 'Windows':
dangerous_prefixes = [
'C:\\Windows\\System32',
'C:\\Windows\\SysWOW64',
'C:\\Program Files',
'C:\\ProgramData',
'C:\\Windows\\System',
]
# Check against both forward and backslash paths
target_str = str(target).lower()
dangerous = any(target_str.startswith(prefix.lower()) for prefix in dangerous_prefixes)
elif system == 'Darwin': # macOS
dangerous_prefixes = [
'/System',
'/Library',
'/private',
'/usr/bin',
'/usr/sbin',
'/sbin',
'/bin',
]
target_str = str(target)
dangerous = any(target_str.startswith(prefix) for prefix in dangerous_prefixes)
else: # Linux and other Unix-like systems
dangerous_prefixes = ['/etc', '/sys', '/proc', '/dev', '/bin', '/sbin', '/boot']
target_str = str(target)
dangerous = any(target_str.startswith(prefix) for prefix in dangerous_prefixes)
if dangerous:
raise ValueError(f"Access to system directory denied: {file_path}")
return target
@@ -112,25 +153,32 @@ def validate_safe_path(file_path: str, base_dir: str = '.', allow_absolute: bool
raise ValueError(f"Invalid or unsafe file path: {file_path} - {e}")
def sanitize_input(content: str, max_length: int = 500, strip_controls: bool = True) -> str:
def sanitize_input(content: str, max_length: Optional[int] = 500, strip_controls: bool = True) -> str:
"""
Sanitize user input to prevent injection attacks
Args:
content: Input string to sanitize
max_length: Maximum allowed length (default: 500 chars)
max_length: Maximum allowed length (default: 500 chars, None to disable length check)
strip_controls: Whether to remove control characters (default: True)
Returns:
Sanitized string
Raises:
ValueError: If max_length is negative
"""
if not isinstance(content, str):
content = str(content)
# Limit length to prevent DoS
if len(content) > max_length:
content = content[:max_length]
logger.debug(f"Input truncated to {max_length} characters")
# Validate max_length if provided
if max_length is not None:
if max_length < 0:
raise ValueError(f"max_length must be non-negative, got {max_length}")
# Limit length to prevent DoS
if len(content) > max_length:
content = content[:max_length]
logger.debug(f"Input truncated to {max_length} characters")
# Remove control characters except newline, carriage return, tab
if strip_controls:
@@ -254,4 +302,4 @@ def validate_integer_range(value: int, min_value: int, max_value: int, name: str
f"{name} must be between {min_value} and {max_value}, got {value}"
)
return True
return True