From 54aeb28bf01cdd1f9fffa7de9830f29d3527fc82 Mon Sep 17 00:00:00 2001 From: Stacy Olivas Date: Thu, 9 Apr 2026 23:45:43 -0700 Subject: [PATCH] security: SSRF hardening, log injection sanitization, and allow_local_smtp Add SSRF host validation to maintenance.py send_nightly_email and scheduler.py send_zombie_alert_email using validate_external_url(). New allow_local_smtp config key permits private-IP SMTP for local relay setups. Add sanitize_name() to security_utils and apply it to all log calls in message_handler, repeater_manager, path_command, solarforecast_command, command_manager, and discord_bridge_service to prevent log injection. Move nightly email logic from duplicate scheduler._send_nightly_email() into the canonical maintenance.py implementation, removing the duplicate. Update tests to call maintenance.send_nightly_email() directly. Add validate_external_url allow_private parameter with support for loopback, RFC1918, CGN, and link-local address ranges. --- config.ini.example | 18 ++ modules/command_manager.py | 12 +- modules/commands/feed_command.py | 24 +-- modules/commands/path_command.py | 33 +-- modules/commands/solarforecast_command.py | 5 +- modules/feed_manager.py | 13 +- modules/maintenance.py | 10 + modules/message_handler.py | 8 +- modules/profanity_filter.py | 16 +- modules/repeater_manager.py | 7 +- modules/scheduler.py | 126 ++++++++++- modules/security_utils.py | 61 +++++- .../service_plugins/discord_bridge_service.py | 5 +- modules/web_viewer/app.py | 49 ++++- tests/test_core.py | 3 + tests/test_scheduler_logic.py | 103 ++++++++- tests/test_security_utils.py | 94 +++++++- tests/test_web_viewer.py | 200 ++++++++++++++++++ 18 files changed, 707 insertions(+), 80 deletions(-) diff --git a/config.ini.example b/config.ini.example index 72f1c4c..3949490 100644 --- a/config.ini.example +++ b/config.ini.example @@ -202,6 +202,24 @@ radio_probe_fail_threshold = 3 # true: send alert email in addition to logging radio_zombie_alert_enabled = false +<<<<<<< HEAD +======= +# SMTP security note: by default the bot blocks SMTP hosts that resolve to +# any private, loopback, link-local, or reserved address to prevent SSRF. +# Blocked ranges (per RFC): +# 10.0.0.0/8 RFC 1918 — private +# 172.16.0.0/12 RFC 1918 — private +# 192.168.0.0/16 RFC 1918 — private +# 127.0.0.0/8 RFC 1122 — loopback +# 169.254.0.0/16 RFC 3927 — link-local +# 100.64.0.0/10 RFC 6598 — shared address space (CGN) +# ::1/128 RFC 4291 — IPv6 loopback +# fc00::/7 RFC 4193 — IPv6 unique local (ULA) +# fe80::/10 RFC 4291 — IPv6 link-local +# If you run a local SMTP relay (e.g. Postfix on localhost), enable +# "Allow local SMTP host" in the web viewer → Settings → Notifications. + +>>>>>>> 38d040a (security: SSRF hardening, log injection sanitization, and allow_local_smtp) # Alert email recipients for zombie detection (comma-separated addresses). # If empty, falls back to the nightly maintenance email recipients. # Set this to a separate on-call address if needed. diff --git a/modules/command_manager.py b/modules/command_manager.py index e34c338..396f4db 100644 --- a/modules/command_manager.py +++ b/modules/command_manager.py @@ -22,6 +22,7 @@ from .config_validation import ( ) from .models import CHANNEL_REGIONAL_FLOOD_SCOPE_BODY_OVERHEAD, MeshMessage from .plugin_loader import PluginLoader +from .security_utils import sanitize_name, validate_safe_path from .utils import check_internet_connectivity_async, decode_escape_sequences, format_keyword_response_with_placeholders @@ -857,6 +858,15 @@ class CommandManager: self.logger.warning(f"RandomLine matched '{key}' but missing config file.{key}") return None + try: + validated_path = validate_safe_path(file_path, allow_absolute=True) + except ValueError: + validated_path = None + if validated_path is None: + self.logger.warning(f"RandomLine: unsafe or restricted path rejected for '{key}': {file_path}") + return None + file_path = str(validated_path) + # Read usable lines try: with open(file_path, encoding="utf-8") as f: @@ -952,7 +962,7 @@ class CommandManager: # Use the contact name for logging contact_name = contact.get('name', contact.get('adv_name', recipient_id)) - self.logger.info(f"Sending DM to {contact_name}: {content}") + self.logger.info("Sending DM to %s", sanitize_name(contact_name)) # Record transmission for repeat tracking (don't let this block sending) try: diff --git a/modules/commands/feed_command.py b/modules/commands/feed_command.py index 400bd44..8bce220 100644 --- a/modules/commands/feed_command.py +++ b/modules/commands/feed_command.py @@ -6,9 +6,9 @@ Handles RSS and API feed subscription management import json from typing import Optional -from urllib.parse import urlparse from ..models import MeshMessage +from ..security_utils import sanitize_input, sanitize_name, validate_external_url from .base_command import BaseCommand @@ -103,13 +103,13 @@ feed status 1""" return await self.send_response(message, "Feed type must be 'rss' or 'api'") feed_url = args[1] - channel_name = args[2] - feed_name = args[3] if len(args) > 3 else None + channel_name = sanitize_name(args[2], max_length=64) + feed_name = sanitize_input(args[3], max_length=100) if len(args) > 3 else None api_config = args[4] if len(args) > 4 and feed_type == 'api' else None - # Validate URL - if not self._validate_url(feed_url): - return await self.send_response(message, "Invalid URL format") + # Validate URL — full SSRF protection (IP range + DNS check) + if not validate_external_url(feed_url): + return await self.send_response(message, "Invalid or unsafe URL") # Validate channel exists channel_num = self.bot.channel_manager.get_channel_number(channel_name) @@ -242,8 +242,8 @@ feed status 1""" feed_url = args[0] - if not self._validate_url(feed_url): - return await self.send_response(message, "Invalid URL format") + if not validate_external_url(feed_url): + return await self.send_response(message, "Invalid or unsafe URL") # Test would require feed_manager to be available # For now, just validate URL @@ -295,14 +295,6 @@ feed status 1""" self.logger.error(f"Error updating feed: {e}") return await self.send_response(message, f"Error: {str(e)}") - def _validate_url(self, url: str) -> bool: - """Validate URL format""" - try: - result = urlparse(url) - return all([result.scheme in ['http', 'https'], result.netloc]) - except Exception: - return False - def _create_subscription(self, feed_type: str, feed_url: str, channel_name: str, feed_name: Optional[str] = None, api_config: Optional[dict] = None) -> int: """Create a new feed subscription""" diff --git a/modules/commands/path_command.py b/modules/commands/path_command.py index ce66ad3..1f7416a 100644 --- a/modules/commands/path_command.py +++ b/modules/commands/path_command.py @@ -11,6 +11,7 @@ import time from typing import Any, Callable, Optional from ..models import MeshMessage +from ..security_utils import sanitize_name from ..utils import calculate_distance, parse_path_string from .base_command import BaseCommand @@ -769,7 +770,7 @@ class PathCommand(BaseCommand): # Apply star bias multiplier if repeater is starred if repeater.get('is_starred', False): combined_score *= self.star_bias_multiplier - self.logger.debug(f"Applied star bias ({self.star_bias_multiplier}x) to {repeater.get('name', 'unknown')}") + self.logger.debug(f"Applied star bias ({self.star_bias_multiplier}x) to {sanitize_name(repeater.get('name', 'unknown'))}") # SNR bonus: If repeater has SNR data, it's a zero-hop repeater (direct neighbor) # This is strong evidence it's close and should be preferred @@ -778,7 +779,7 @@ class PathCommand(BaseCommand): # Add bonus proportional to zero-hop bonus (20% of combined score) snr_bonus = combined_score * 0.2 combined_score += snr_bonus - self.logger.debug(f"SNR bonus for {repeater.get('name', 'unknown')}: +{snr_bonus:.3f} (has SNR data, confirmed zero-hop)") + self.logger.debug(f"SNR bonus for {sanitize_name(repeater.get('name', 'unknown'))}: +{snr_bonus:.3f} (has SNR data, confirmed zero-hop)") combined_scores.append((combined_score, distance, repeater)) @@ -1165,7 +1166,7 @@ class PathCommand(BaseCommand): # Apply star bias multiplier if repeater is starred if repeater.get('is_starred', False): combined_score *= self.star_bias_multiplier - self.logger.debug(f"Applied star bias ({self.star_bias_multiplier}x) to {repeater.get('name', 'unknown')}") + self.logger.debug(f"Applied star bias ({self.star_bias_multiplier}x) to {sanitize_name(repeater.get('name', 'unknown'))}") # SNR bonus: If repeater has SNR data, it's a zero-hop repeater (direct neighbor) # This is strong evidence it's close and should be preferred @@ -1174,7 +1175,7 @@ class PathCommand(BaseCommand): # Add bonus proportional to zero-hop bonus (20% of combined score) snr_bonus = combined_score * 0.2 combined_score += snr_bonus - self.logger.debug(f"SNR bonus for {repeater.get('name', 'unknown')}: +{snr_bonus:.3f} (has SNR data, confirmed zero-hop)") + self.logger.debug(f"SNR bonus for {sanitize_name(repeater.get('name', 'unknown'))}: +{snr_bonus:.3f} (has SNR data, confirmed zero-hop)") if combined_score > best_combined_score: best_combined_score = combined_score @@ -1248,7 +1249,7 @@ class PathCommand(BaseCommand): # Apply star bias multiplier if repeater is starred if repeater.get('is_starred', False): combined_score *= self.star_bias_multiplier - self.logger.debug(f"Applied star bias ({self.star_bias_multiplier}x) to {repeater.get('name', 'unknown')}") + self.logger.debug(f"Applied star bias ({self.star_bias_multiplier}x) to {sanitize_name(repeater.get('name', 'unknown'))}") # SNR bonus: If repeater has SNR data, it's a zero-hop repeater (direct neighbor) # This is strong evidence it's close and should be preferred @@ -1257,7 +1258,7 @@ class PathCommand(BaseCommand): # Add bonus proportional to zero-hop bonus (20% of combined score) snr_bonus = combined_score * 0.2 combined_score += snr_bonus - self.logger.debug(f"SNR bonus for {repeater.get('name', 'unknown')}: +{snr_bonus:.3f} (has SNR data, confirmed zero-hop)") + self.logger.debug(f"SNR bonus for {sanitize_name(repeater.get('name', 'unknown'))}: +{snr_bonus:.3f} (has SNR data, confirmed zero-hop)") all_scores.append((repeater.get('name', 'unknown'), distance, recency_score, proximity_score, combined_score)) @@ -1358,7 +1359,7 @@ class PathCommand(BaseCommand): stored_to_key = prev_to_candidate_edge.get('to_public_key', '').lower() if prev_to_candidate_edge.get('to_public_key') else None if stored_to_key and stored_to_key == candidate_public_key: stored_key_bonus = max(stored_key_bonus, 0.4) # Strong bonus for matching stored key - self.logger.debug(f"Found stored public key match for {repeater.get('name', 'unknown')} in edge {prev_norm}->{candidate_norm}") + self.logger.debug(f"Found stored public key match for {sanitize_name(repeater.get('name', 'unknown'))} in edge {prev_norm}->{candidate_norm}") # Check edge from candidate to next node if next_norm: @@ -1367,7 +1368,7 @@ class PathCommand(BaseCommand): stored_from_key = candidate_to_next_edge.get('from_public_key', '').lower() if candidate_to_next_edge.get('from_public_key') else None if stored_from_key and stored_from_key == candidate_public_key: stored_key_bonus = max(stored_key_bonus, 0.4) # Strong bonus for matching stored key - self.logger.debug(f"Found stored public key match for {repeater.get('name', 'unknown')} in edge {candidate_norm}->{next_norm}") + self.logger.debug(f"Found stored public key match for {sanitize_name(repeater.get('name', 'unknown'))} in edge {candidate_norm}->{next_norm}") # Zero-hop bonus: If this repeater has been heard directly by the bot (zero-hop advert), # it's strong evidence it's close and should be preferred, even for intermediate hops. @@ -1378,7 +1379,7 @@ class PathCommand(BaseCommand): if hop_count is not None and hop_count == 0 and graph_score > 0: # This repeater has been heard directly - strong evidence it's close to bot zero_hop_bonus = self.graph_zero_hop_bonus - self.logger.debug(f"Zero-hop bonus for {repeater.get('name', 'unknown')}: {zero_hop_bonus:.2%} (heard directly by bot)") + self.logger.debug(f"Zero-hop bonus for {sanitize_name(repeater.get('name', 'unknown'))}: {zero_hop_bonus:.2%} (heard directly by bot)") # SNR bonus: If this repeater has SNR data, it's a zero-hop repeater (direct neighbor) # This is even stronger evidence than just hop_count == 0, as it means we have actual signal quality data. @@ -1389,7 +1390,7 @@ class PathCommand(BaseCommand): # SNR presence indicates zero-hop connection with signal quality data # Use same bonus as zero-hop, but this is more definitive snr_bonus = self.graph_zero_hop_bonus * 1.2 # 20% stronger than zero-hop bonus alone - self.logger.debug(f"SNR bonus for {repeater.get('name', 'unknown')}: {snr_bonus:.2%} (has SNR data, confirmed zero-hop)") + self.logger.debug(f"SNR bonus for {sanitize_name(repeater.get('name', 'unknown'))}: {snr_bonus:.2%} (has SNR data, confirmed zero-hop)") # Add stored key bonus, zero-hop bonus, and SNR bonus to graph score graph_score_with_bonus = min(1.0, graph_score + stored_key_bonus + zero_hop_bonus + snr_bonus) @@ -1454,7 +1455,7 @@ class PathCommand(BaseCommand): path_validation_bonus = max(path_validation_bonus, segment_bonus + obs_bonus) # Cap at max bonus path_validation_bonus = min(self.graph_path_validation_max_bonus, path_validation_bonus) - self.logger.debug(f"Path validation match for {repeater.get('name', 'unknown')}: {common_segments} common segments (obs: {obs_count})") + self.logger.debug(f"Path validation match for {sanitize_name(repeater.get('name', 'unknown'))}: {common_segments} common segments (obs: {obs_count})") if path_validation_bonus >= self.graph_path_validation_max_bonus * 0.9: break # Strong match found except Exception as e: @@ -1521,7 +1522,7 @@ class PathCommand(BaseCommand): # Apply penalty: up to penalty_strength reduction penalty = normalized_excess * self.graph_distance_penalty_strength candidate_score = candidate_score * (1.0 - penalty) - self.logger.debug(f"Applied distance penalty to {repeater.get('name', 'unknown')}: {max_distance:.1f}km hop (penalty: {penalty:.2%}, score: {candidate_score:.3f})") + self.logger.debug(f"Applied distance penalty to {sanitize_name(repeater.get('name', 'unknown'))}: {max_distance:.1f}km hop (penalty: {penalty:.2%}, score: {candidate_score:.3f})") elif max_distance > 0: # Even if under threshold, very long hops should get a small penalty # This helps prefer shorter hops when graph evidence is similar @@ -1549,7 +1550,7 @@ class PathCommand(BaseCommand): # Apply max distance threshold if configured if self.graph_final_hop_max_distance > 0 and distance > self.graph_final_hop_max_distance: # Beyond max distance - skip proximity bonus - self.logger.debug(f"Final hop candidate {repeater.get('name', 'unknown')} is {distance:.1f}km from bot, beyond max distance {self.graph_final_hop_max_distance:.1f}km") + self.logger.debug(f"Final hop candidate {sanitize_name(repeater.get('name', 'unknown'))} is {distance:.1f}km from bot, beyond max distance {self.graph_final_hop_max_distance:.1f}km") else: # Normalize distance to 0-1 score (inverse: closer = higher score) # Use configurable normalization distance (default 500km for more aggressive scoring) @@ -1569,14 +1570,14 @@ class PathCommand(BaseCommand): # Combine with graph score using effective weight candidate_score = candidate_score * (1.0 - effective_weight) + proximity_score * effective_weight - self.logger.debug(f"Final hop proximity for {repeater.get('name', 'unknown')}: distance={distance:.1f}km, proximity_score={proximity_score:.3f}, effective_weight={effective_weight:.3f}, combined_score={candidate_score:.3f}") + self.logger.debug(f"Final hop proximity for {sanitize_name(repeater.get('name', 'unknown'))}: distance={distance:.1f}km, proximity_score={proximity_score:.3f}, effective_weight={effective_weight:.3f}, combined_score={candidate_score:.3f}") else: # Repeater without valid location data - apply significant penalty for final hop # This ensures we prefer repeaters with known locations, especially direct neighbors # Penalty: reduce score by 50% (repeaters with location data will have proximity bonus, so this creates strong preference) location_penalty = 0.5 candidate_score = candidate_score * (1.0 - location_penalty) - self.logger.debug(f"Final hop candidate {repeater.get('name', 'unknown')} has no valid location data - applying {location_penalty:.0%} penalty (score: {candidate_score:.3f})") + self.logger.debug(f"Final hop candidate {sanitize_name(repeater.get('name', 'unknown'))} has no valid location data - applying {location_penalty:.0%} penalty (score: {candidate_score:.3f})") # Apply star bias multiplier if repeater is starred # Starred repeaters should get significant advantage in graph selection @@ -1586,7 +1587,7 @@ class PathCommand(BaseCommand): candidate_score *= self.star_bias_multiplier # Cap at 1.0 but allow it to exceed temporarily for comparison # We'll normalize later when converting to confidence - self.logger.debug(f"Applied star bias ({self.star_bias_multiplier}x) to {repeater.get('name', 'unknown')} in graph selection (score: {candidate_score:.3f})") + self.logger.debug(f"Applied star bias ({self.star_bias_multiplier}x) to {sanitize_name(repeater.get('name', 'unknown'))} in graph selection (score: {candidate_score:.3f})") if candidate_score > best_score: best_score = candidate_score diff --git a/modules/commands/solarforecast_command.py b/modules/commands/solarforecast_command.py index f8e5139..dce27e2 100644 --- a/modules/commands/solarforecast_command.py +++ b/modules/commands/solarforecast_command.py @@ -13,6 +13,7 @@ from typing import Optional import requests from ..models import MeshMessage +from ..security_utils import sanitize_name from ..utils import ( abbreviate_location, geocode_city, @@ -345,7 +346,7 @@ class SolarforecastCommand(BaseCommand): ''' all_repeaters = self.bot.db_manager.execute_query(all_repeaters_query) if all_repeaters: - self.logger.debug(f"Sample repeaters in DB: {[r.get('name') for r in all_repeaters[:5]]}") + self.logger.debug(f"Sample repeaters in DB: {[sanitize_name(r.get('name', '')) for r in all_repeaters[:5]]}") return None, None except Exception as e: @@ -610,7 +611,7 @@ class SolarforecastCommand(BaseCommand): if 'message' in data: msg = data['message'] if msg.get('type') != 'success': - self.logger.error(f"Forecast.Solar API error: {msg.get('text', 'Unknown')}") + self.logger.error(f"Forecast.Solar API error: {sanitize_name(msg.get('text', 'Unknown'))}") return None result = data.get('result', {}) diff --git a/modules/feed_manager.py b/modules/feed_manager.py index be86dea..71bf208 100644 --- a/modules/feed_manager.py +++ b/modules/feed_manager.py @@ -22,6 +22,7 @@ import aiohttp import feedparser from modules.feed_filter_eval import item_passes_filter_config +from modules.security_utils import sanitize_input, validate_external_url from modules.url_shortener import _coerce_url_string, shorten_url_sync @@ -181,6 +182,12 @@ class FeedManager: feed['channel_name'] try: + # Validate URL for SSRF protection + if not validate_external_url(feed_url): + self.logger.error(f"Feed URL validation failed: {feed_url}") + self._record_feed_error(feed_id, 'security', 'Invalid or unsafe URL') + return + self.logger.debug(f"Polling {feed_type} feed {feed_id}: {feed_url}") # Rate limit per domain @@ -927,8 +934,10 @@ class FeedManager: format_str = feed.get('output_format') or self.default_output_format # Extract field values (DB/feed may store NULL; .get('k', default) still returns None if key present) - title = item.get('title') or 'Untitled' - body = item.get('description', '') or item.get('body', '') + # Use sanitize_input with max_length=None to only strip control characters without truncating + # Truncation happens later via _apply_shortening when needed + title = sanitize_input(item.get('title') or 'Untitled', max_length=None) + body = sanitize_input(item.get('description', '') or item.get('body', ''), max_length=None) # Clean HTML from body if present if body: body = html.unescape(body) diff --git a/modules/maintenance.py b/modules/maintenance.py index ec15cf7..cd74492 100644 --- a/modules/maintenance.py +++ b/modules/maintenance.py @@ -11,6 +11,8 @@ import time from pathlib import Path from typing import TYPE_CHECKING, Any, Callable +from .security_utils import validate_external_url + if TYPE_CHECKING: pass @@ -359,6 +361,14 @@ class MaintenanceRunner: ) return + allow_local = self.get_notif('allow_local_smtp').lower() == 'true' + if not validate_external_url(f'http://{smtp_host}', allow_private=allow_local): + self.logger.error( + "Nightly email aborted: SMTP host %r resolves to a private or reserved address", + smtp_host, + ) + return + try: smtp_port = int(self.get_notif('smtp_port') or (465 if smtp_security == 'ssl' else 587)) except ValueError: diff --git a/modules/message_handler.py b/modules/message_handler.py index 65f5a62..60b5969 100644 --- a/modules/message_handler.py +++ b/modules/message_handler.py @@ -15,7 +15,7 @@ from typing import Any, Optional from .enums import AdvertFlags, DeviceRole, PayloadType, PayloadVersion, RouteType from .graph_trace_helper import update_mesh_graph_from_trace_data from .models import MeshMessage -from .security_utils import sanitize_input +from .security_utils import sanitize_input, sanitize_name from .utils import ( calculate_packet_hash, decode_path_len_byte, @@ -147,7 +147,7 @@ class MessageHandler: self.logger.debug(f"Payload keys: {list(payload.keys())}") self.logger.debug(f"Event metadata: {event.metadata if hasattr(event, 'metadata') else 'None'}") - self.logger.info(f"Received DM from {payload.get('pubkey_prefix', 'unknown')}: {payload.get('text', '')}") + self.logger.info(f"Received DM from {sanitize_name(payload.get('pubkey_prefix', 'unknown'))}: {sanitize_name(payload.get('text', ''))}") # Extract path information from contacts using pubkey_prefix path_info = "Unknown" @@ -510,7 +510,7 @@ class MessageHandler: parsed_advert = self.parse_advert(payload_bytes) if parsed_advert: advert_data = parsed_advert - self.logger.info(f"✅ Parsed ADVERT: {advert_data.get('mode', 'Unknown')} - {advert_data.get('name', 'No name')}") + self.logger.info(f"✅ Parsed ADVERT: {sanitize_name(advert_data.get('mode', 'Unknown'))} - {sanitize_name(advert_data.get('name', 'No name'))}") except Exception as e: self.logger.warning(f"Failed to parse ADVERT payload: {e}") @@ -640,7 +640,7 @@ class MessageHandler: self.logger.info(f"📡 Tracked {mode}: {name}{location}{hop_info}") else: - self.logger.warning(f"Failed to track contact advertisement: {advert_data.get('name', 'Unknown')}") + self.logger.warning(f"Failed to track contact advertisement: {sanitize_name(advert_data.get('name', 'Unknown'))}") except Exception as e: self.logger.error(f"Error processing advertisement packet: {e}") diff --git a/modules/profanity_filter.py b/modules/profanity_filter.py index c8b93e5..00c7bb0 100644 --- a/modules/profanity_filter.py +++ b/modules/profanity_filter.py @@ -8,7 +8,7 @@ better-profanity can detect them. Also checks for hate symbols (e.g. swastika Unicode) that word lists do not catch. """ -from typing import Any, Optional +from typing import Optional # Unicode code points for symbols we treat as profanity (e.g. swastika forms). # These are checked in addition to better-profanity's word list. @@ -22,23 +22,17 @@ _profanity_initialized = False _warned_unavailable = False _unidecode_available = False -profanity: Any = None try: - from better_profanity import profanity as _better_profanity - - profanity = _better_profanity + from better_profanity import profanity _profanity_available = True except ImportError: - pass + profanity = None -unidecode: Any = None try: - from unidecode import unidecode as _unidecode_impl - - unidecode = _unidecode_impl + from unidecode import unidecode _unidecode_available = True except ImportError: - pass + unidecode = None def _has_hate_symbols(text: str) -> bool: diff --git a/modules/repeater_manager.py b/modules/repeater_manager.py index 487212d..53ab929 100644 --- a/modules/repeater_manager.py +++ b/modules/repeater_manager.py @@ -12,6 +12,7 @@ from typing import Any, Optional from meshcore import EventType +from .security_utils import sanitize_name from .utils import rate_limited_nominatim_reverse_sync @@ -1815,7 +1816,7 @@ class RepeaterManager: # Debug logging for first few contacts to understand structure if processed_count <= 5: - self.logger.debug(f"Contact {processed_count}: {contact_data.get('name', 'Unknown')} (type: {contact_data.get('type')}, keys: {list(contact_data.keys())})") + self.logger.debug(f"Contact {processed_count}: {sanitize_name(contact_data.get('name', 'Unknown'))} (type: {contact_data.get('type')}, keys: {list(contact_data.keys())})") if self._is_repeater_device(contact_data): public_key = contact_data.get('public_key', contact_key) @@ -2549,7 +2550,7 @@ class RepeaterManager: 'days_stale': (datetime.now() - last_seen_dt).days }) except Exception as e: - self.logger.debug(f"Error parsing timestamp for contact {contact_data.get('name', 'Unknown')}: {e}") + self.logger.debug(f"Error parsing timestamp for contact {sanitize_name(contact_data.get('name', 'Unknown'))}: {e}") continue # Sort by days stale (oldest first) @@ -2650,7 +2651,7 @@ class RepeaterManager: await asyncio.sleep(1) except Exception as e: - self.logger.error(f"Error removing stale contact {contact.get('name', 'Unknown')}: {e}") + self.logger.error(f"Error removing stale contact {sanitize_name(contact.get('name', 'Unknown'))}: {e}") continue return removed_count diff --git a/modules/scheduler.py b/modules/scheduler.py index 5059e5f..3435621 100644 --- a/modules/scheduler.py +++ b/modules/scheduler.py @@ -901,8 +901,130 @@ class MessageScheduler: def _format_email_body(self, stats: dict[str, Any], period_start: str, period_end: str) -> str: return self.maintenance.format_email_body(stats, period_start, period_end) - def _send_nightly_email(self) -> None: - self.maintenance.send_nightly_email() + # ── Zombie radio alert email ───────────────────────────────────────────── + + def send_zombie_alert_email(self, fail_count: int, threshold: int, interval: int) -> None: + """Send an immediate alert email when a zombie radio is detected. + + Uses the same SMTP settings as the nightly digest. Recipients are taken + from the ``radio_zombie_alert_email`` config key; if that key is empty the + nightly maintenance recipients are used as a fallback. + + This method is intentionally synchronous so it can be run in a thread + executor from the async event loop without blocking it. + """ + import smtplib + import ssl as _ssl + from email.message import EmailMessage + + if not self.bot.config.getboolean('Bot', 'radio_zombie_alert_enabled', fallback=True): + return + + smtp_host = self._get_notif('smtp_host') + smtp_security = self._get_notif('smtp_security') or 'starttls' + smtp_user = self._get_notif('smtp_user') + smtp_password = self._get_notif('smtp_password') + from_name = self._get_notif('from_name') or 'MeshCore Bot' + from_email = self._get_notif('from_email') + + # Alert recipients: dedicated config key, falls back to nightly recipients + alert_email_cfg = self.bot.config.get('Bot', 'radio_zombie_alert_email', fallback='').strip() + if alert_email_cfg: + recipients = [r.strip() for r in alert_email_cfg.split(',') if r.strip()] + else: + recipients = [r.strip() for r in self._get_notif('recipients').split(',') if r.strip()] + + if not smtp_host or not from_email or not recipients: + self.bot.logger.warning( + "Zombie alert email enabled but SMTP settings incomplete " + f"(host={smtp_host!r}, from={from_email!r}, recipients={recipients}) " + "— alert email not sent" + ) + return + + allow_local = self._get_notif('allow_local_smtp').lower() == 'true' + if not validate_external_url(f'http://{smtp_host}', allow_private=allow_local): + self.bot.logger.error( + "Zombie alert email aborted: SMTP host %r resolves to a private or reserved address", + smtp_host, + ) + return + + try: + smtp_port = int(self._get_notif('smtp_port') or (465 if smtp_security == 'ssl' else 587)) + except ValueError: + smtp_port = 587 + + now_utc = datetime.datetime.utcnow() + connection_type = self.bot.config.get('Connection', 'connection_type', fallback='unknown') + serial_port = self.bot.config.get('Connection', 'serial_port', fallback='n/a') + interval_min = interval // 60 + + subject = ( + f'ALERT: MeshCore Bot — Zombie Radio Detected ' + f'[{now_utc.strftime("%Y-%m-%d %H:%M UTC")}]' + ) + body = '\n'.join([ + 'MeshCore Bot — Zombie Radio Alert', + '=' * 44, + f'Time : {now_utc.strftime("%Y-%m-%d %H:%M:%S UTC")}', + '', + 'RADIO STATUS', + '─' * 30, + f' Connection : {connection_type}', + f' Port / Device : {serial_port}', + f' Failed probes : {fail_count} of {threshold} (threshold)', + f' Probe interval: {interval}s ({interval_min} min)', + '', + 'ACTION REQUIRED', + '─' * 30, + ' The radio firmware is unresponsive (zombie state).', + ' A physical POWER CYCLE of the radio is required.', + ' Disconnect/reconnect of the serial/BLE transport will NOT fix this.', + '', + ' Steps to recover:', + ' 1. Power off the radio hardware', + ' 2. Wait 10 seconds', + ' 3. Power on the radio hardware', + ' 4. The bot will reconnect and resume normal operation automatically', + '', + '─' * 44, + 'Probe monitoring has been suspended to avoid log spam.', + 'It will resume automatically after the next successful reconnect.', + ]) + + try: + msg = EmailMessage() + msg['Subject'] = subject + msg['From'] = f'{from_name} <{from_email}>' + msg['To'] = ', '.join(recipients) + msg.set_content(body) + + context = _ssl.create_default_context() + _smtp_timeout = 30 + + if smtp_security == 'ssl': + with smtplib.SMTP_SSL(smtp_host, smtp_port, context=context, timeout=_smtp_timeout) as s: + if smtp_user and smtp_password: + s.login(smtp_user, smtp_password) + s.send_message(msg) + else: + with smtplib.SMTP(smtp_host, smtp_port, timeout=_smtp_timeout) as s: + if smtp_security == 'starttls': + s.ehlo() + s.starttls(context=context) + s.ehlo() + if smtp_user and smtp_password: + s.login(smtp_user, smtp_password) + s.send_message(msg) + + self.bot.logger.info( + f"Zombie radio alert email sent to {recipients}" + ) + except Exception as e: + self.bot.logger.error(f"Failed to send zombie radio alert email: {e}") + + # ── Maintenance helpers ────────────────────────────────────────────────── def _get_maint(self, key: str) -> str: return self.maintenance.get_maint(key) diff --git a/modules/security_utils.py b/modules/security_utils.py index 5fff5d8..3c71457 100644 --- a/modules/security_utils.py +++ b/modules/security_utils.py @@ -16,6 +16,9 @@ from urllib.parse import urlparse logger = logging.getLogger('MeshCoreBot.Security') +# CGN (Carrier-Grade NAT) network 100.64.0.0/10 - RFC 6598 +_CGN_NETWORK = ipaddress.ip_network("100.64.0.0/10") + def _is_nix_environment() -> bool: """ @@ -42,13 +45,19 @@ def _is_nix_environment() -> bool: return bool(any(var in os.environ for var in nix_env_vars)) -def validate_external_url(url: str, allow_localhost: bool = False, timeout: float = 2.0) -> bool: +def validate_external_url( + url: str, + allow_private: bool = False, + allow_loopback: bool | None = None, # Deprecated: use allow_private=True instead + timeout: float = 2.0 +) -> bool: """ Validate that URL points to safe external resource (SSRF protection) Args: url: URL to validate - allow_localhost: Whether to allow localhost/private IPs (default: False) + allow_private: Whether to allow private/internal IPs (default: False) + allow_loopback: Deprecated alias for allow_private timeout: DNS resolution timeout in seconds (default: 2.0) Returns: @@ -56,6 +65,10 @@ def validate_external_url(url: str, allow_localhost: bool = False, timeout: floa Raises: ValueError: If URL is invalid or unsafe + + Note: + - allow_loopback=True only permits loopback addresses (127.0.0.1, ::1) + - allow_private=True permits all internal ranges (loopback, RFC1918, CGN, link-local) """ try: parsed = urlparse(url) @@ -84,17 +97,30 @@ def validate_external_url(url: str, allow_localhost: bool = False, timeout: floa ip_obj = ipaddress.ip_address(ip) - # If localhost is not allowed, reject private/internal IPs - if not allow_localhost: - # Reject private/internal IPs + # If loopback is not allowed, reject loopback addresses + if allow_loopback is True: + # Only allow loopback, reject everything else + if not ip_obj.is_loopback: + logger.warning(f"URL resolves to non-loopback IP with allow_loopback: {ip}") + return False + elif allow_loopback is False or not allow_private: + # Reject private/internal IPs (RFC1918, CGN, link-local) if ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_link_local: logger.warning(f"URL resolves to private/internal IP: {ip}") return False + # Reject CGN (Carrier-Grade NAT) - RFC 6598 + if ip_obj in _CGN_NETWORK: + logger.warning(f"URL resolves to CGN IP: {ip}") + return False + # Reject reserved ranges if ip_obj.is_reserved or ip_obj.is_multicast: logger.warning(f"URL resolves to reserved/multicast IP: {ip}") return False + else: + # allow_private=True: allow all internal ranges + pass except socket.gaierror as e: logger.warning(f"Failed to resolve hostname {parsed.hostname}: {e}") @@ -243,6 +269,31 @@ def sanitize_input(content: str, max_length: Optional[int] = 500, strip_controls return content.strip() +def sanitize_name(name: object, max_length: int = 64) -> str: + """ + Sanitize a short identifier (node name, channel name, etc.) for safe logging and storage. + + Strips all control characters including newlines, carriage returns, tabs, + null bytes, and ANSI escape sequences. Truncates to max_length. + + Args: + name: Value to sanitize (coerced to str if not already). + max_length: Maximum character length (default: 64). + + Returns: + Sanitized string. + + Raises: + ValueError: If max_length is negative. + """ + if max_length < 0: + raise ValueError(f"max_length must be non-negative, got {max_length}") + text = str(name) if not isinstance(name, str) else name + # Strip all control characters (including \n \r \t \x00 and ANSI escapes) + text = re.sub(r'[\x00-\x1f\x7f]|\x1b\[[0-9;]*[A-Za-z]', '', text) + return text[:max_length] + + # Valid SQLite journal modes for PRAGMA journal_mode validation VALID_JOURNAL_MODES = {"DELETE", "TRUNCATE", "PERSIST", "MEMORY", "WAL", "OFF"} diff --git a/modules/service_plugins/discord_bridge_service.py b/modules/service_plugins/discord_bridge_service.py index b7a66cd..487807d 100644 --- a/modules/service_plugins/discord_bridge_service.py +++ b/modules/service_plugins/discord_bridge_service.py @@ -36,6 +36,7 @@ except ImportError: import contextlib from ..profanity_filter import censor, contains_profanity +from ..security_utils import sanitize_name from .base_service import BaseServicePlugin @@ -618,7 +619,7 @@ class DiscordBridgeService(BaseServicePlugin): # Check response status if response.status == 204: # Success (Discord webhooks return 204 No Content on success) - self.logger.debug(f"Posted to Discord [{channel_name}]: {payload['content'][:50]}...") + self.logger.debug(f"Posted to Discord [{channel_name}]: {sanitize_name(payload['content'])[:50]}...") # Monitor rate limit headers self._check_rate_limit_headers(response.headers, webhook_url, channel_name) return True @@ -675,7 +676,7 @@ class DiscordBridgeService(BaseServicePlugin): # Check response status if response.status_code == 204: # Success - self.logger.debug(f"Posted to Discord [{channel_name}]: {payload['content'][:50]}...") + self.logger.debug(f"Posted to Discord [{channel_name}]: {sanitize_name(payload['content'])[:50]}...") # Monitor rate limit headers self._check_rate_limit_headers(response.headers, webhook_url, channel_name) return True diff --git a/modules/web_viewer/app.py b/modules/web_viewer/app.py index 7884d95..97a5497 100644 --- a/modules/web_viewer/app.py +++ b/modules/web_viewer/app.py @@ -40,7 +40,11 @@ from flask import ( ) from flask_socketio import SocketIO, disconnect, emit -from modules.security_utils import VALID_JOURNAL_MODES, validate_sql_identifier +from modules.security_utils import ( + VALID_JOURNAL_MODES, + validate_external_url, + validate_sql_identifier, +) from modules.version_info import resolve_runtime_version @@ -1340,6 +1344,7 @@ class BotDataViewer: 'notif.smtp_user', 'notif.smtp_password', 'notif.from_name', 'notif.from_email', 'notif.recipients', 'notif.nightly_enabled', + 'notif.allow_local_smtp', ] settings = {} for k in keys: @@ -1399,6 +1404,14 @@ class BotDataViewer: if not recipients: return jsonify({'error': 'No recipients configured'}), 400 + # Validate SMTP host for SSRF protection + # allow_local_smtp=true permits private/internal SMTP hosts (e.g., local Postfix) + allow_local_smtp = _get('allow_local_smtp').lower() == 'true' + if not validate_external_url(f'http://{smtp_host}', allow_private=allow_local_smtp): + if allow_local_smtp: + return jsonify({'error': 'Invalid or unsafe SMTP host'}), 400 + return jsonify({'error': 'Invalid or unsafe SMTP host (private/internal IP blocked)'}), 400 + try: msg = EmailMessage() msg['Subject'] = 'MeshCore Bot — test email' @@ -1741,12 +1754,23 @@ class BotDataViewer: backup_dir_str = self.db_manager.get_metadata('maint.db_backup_dir') or '' if not backup_dir_str or not os.path.isdir(backup_dir_str): return jsonify({'error': 'No valid backup directory configured'}), 400 - # Ensure the resolved path is within the backup directory (prevents traversal) + + # Validate path is within the configured backup directory + # First check for dangerous system paths, then check if path is within backup dir backup_dir = Path(backup_dir_str).resolve() src = Path(db_file).resolve() + + # Check for dangerous system paths first (returns 400) + target_str = str(src).lower() + dangerous_prefixes = ['/etc', '/sys', '/proc', '/dev', '/bin', '/sbin', '/boot'] + if any(target_str.startswith(prefix) for prefix in dangerous_prefixes): + return jsonify({'error': 'Access to system directory denied'}), 400 + + # Check if path is within the backup directory (prevents traversal) try: src.relative_to(backup_dir) except ValueError: + # Path is outside backup directory - return 403 return jsonify({'error': 'Restore path must be within the configured backup directory'}), 403 if not src.exists(): @@ -3318,7 +3342,11 @@ class BotDataViewer: fallback='{emoji} {body|truncate:100} - {date}\n{link|truncate:50}') # Fetch and format feed items - preview_items = self._preview_feed_items(feed_url, feed_type, output_format, api_config, filter_config, sort_config) + try: + preview_items = self._preview_feed_items(feed_url, feed_type, output_format, api_config, filter_config, sort_config) + except ValueError as e: + # SSRF validation error - return 400 + return jsonify({'error': str(e)}), 400 return jsonify({ 'success': True, @@ -3336,14 +3364,13 @@ class BotDataViewer: if not data or 'url' not in data: return jsonify({'error': 'URL is required'}), 400 - # This would require feed_manager - for now just validate URL - from urllib.parse import urlparse url = data['url'] - result = urlparse(url) - if not all([result.scheme in ['http', 'https'], result.netloc]): - return jsonify({'error': 'Invalid URL format'}), 400 - return jsonify({'success': True, 'message': 'URL validated (full test requires feed manager)'}) + # Validate URL for SSRF protection + if not validate_external_url(url): + return jsonify({'error': 'Invalid or unsafe URL'}), 400 + + return jsonify({'success': True, 'message': 'URL validated'}) except Exception as e: self.logger.error(f"Error testing feed: {e}") return jsonify({'error': str(e)}), 500 @@ -5933,6 +5960,10 @@ class BotDataViewer: try: items = [] + # Validate URL for SSRF protection + if not validate_external_url(feed_url): + raise ValueError("Invalid or unsafe feed URL") + if feed_type == 'rss': # Fetch RSS feed response = requests.get(feed_url, timeout=30, headers={'User-Agent': 'MeshCoreBot/1.0 FeedManager'}) diff --git a/tests/test_core.py b/tests/test_core.py index 641b158..d31155f 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -282,6 +282,7 @@ class TestLoopExceptionHandler: mock_loop.default_exception_handler.assert_called_once_with(ctx) +<<<<<<< HEAD # --------------------------------------------------------------------------- # _probe_radio_health (PR4 — zombie-connection detection) # --------------------------------------------------------------------------- @@ -538,3 +539,5 @@ async def _make_coro_async(value): def _make_coro(value): """Return a coroutine that immediately resolves to *value*.""" return _make_coro_async(value) +======= +>>>>>>> 38d040a (security: SSRF hardening, log injection sanitization, and allow_local_smtp) diff --git a/tests/test_scheduler_logic.py b/tests/test_scheduler_logic.py index 6d886a2..cbd5557 100644 --- a/tests/test_scheduler_logic.py +++ b/tests/test_scheduler_logic.py @@ -696,9 +696,9 @@ class TestSendNightlyEmailDisabled: def _get_notif(key): return {"nightly_enabled": "false"}.get(key, "") - scheduler._get_notif = Mock(side_effect=_get_notif) + scheduler.maintenance.get_notif = Mock(side_effect=_get_notif) # Should not raise and should not call smtplib - scheduler._send_nightly_email() + scheduler.maintenance.send_nightly_email() # No assertion needed — if it reaches here without smtplib, it returned early @@ -1308,6 +1308,7 @@ class TestCollectEmailStats: assert result.get("contacts_24h") == 10 assert result.get("contacts_new_24h") == 3 +<<<<<<< HEAD # --------------------------------------------------------------------------- # _send_interval_advert_async (PR2 fix — Event-based error detection) @@ -1460,3 +1461,101 @@ class TestSendScheduledMessageAsyncTimeout: assert call_args_list, "logger.error was never called" logged = str(call_args_list[0]) assert "TimeoutError" in logged +======= +# --------------------------------------------------------------------------- +# SSRF guard — SMTP host validation in email-sending methods +# --------------------------------------------------------------------------- + + +def _make_smtp_scheduler(notif_overrides: dict) -> "MessageScheduler": + """Return a scheduler whose maintenance.get_notif returns values from notif_overrides.""" + bot = Mock() + bot.logger = Mock() + bot.config = ConfigParser() + bot.config.add_section("Bot") + bot.config.set("Bot", "radio_zombie_alert_enabled", "true") + bot.config.set("Bot", "radio_zombie_alert_email", "") + bot.config.add_section("Connection") + bot.config.set("Connection", "connection_type", "serial") + sched = MessageScheduler(bot) + defaults = { + "nightly_enabled": "true", + "smtp_host": "smtp.example.com", + "smtp_port": "587", + "smtp_security": "starttls", + "smtp_user": "", + "smtp_password": "", + "from_name": "Bot", + "from_email": "bot@example.com", + "recipients": "admin@example.com", + } + defaults.update(notif_overrides) + sched.maintenance.get_notif = Mock(side_effect=lambda k: defaults.get(k, "")) + sched._get_notif = Mock(side_effect=lambda k: defaults.get(k, "")) # zombie alert still uses _get_notif + return sched + + +class TestNightlyEmailSsrfGuard: + """send_nightly_email must abort on private IP unless allow_local_smtp=true.""" + + @pytest.mark.parametrize("bad_host", [ + "10.0.0.1", # RFC 1918 10.0.0.0/8 + "172.16.0.1", # RFC 1918 172.16.0.0/12 + "192.168.1.1", # RFC 1918 192.168.0.0/16 + "127.0.0.1", # RFC 1122 127.0.0.0/8 loopback + "169.254.0.1", # RFC 3927 169.254.0.0/16 link-local + "100.64.0.1", # RFC 6598 100.64.0.0/10 shared/CGN + "::1", # RFC 4291 ::1/128 IPv6 loopback + "fd00::1", # RFC 4193 fc00::/7 IPv6 ULA + "fe80::1", # RFC 4291 fe80::/10 IPv6 link-local + ]) + def test_private_smtp_host_aborts_nightly_email(self, bad_host): + sched = _make_smtp_scheduler({"smtp_host": bad_host}) + sched.maintenance.send_nightly_email() + sched.bot.logger.error.assert_called() + logged = str(sched.bot.logger.error.call_args_list) + assert "private" in logged.lower() or "reserved" in logged.lower() + + @pytest.mark.parametrize("local_host", ["127.0.0.1", "192.168.1.1"]) + def test_allow_local_smtp_bypasses_ssrf_guard(self, local_host): + """allow_local_smtp=true permits private-IP SMTP (e.g. local Postfix).""" + sched = _make_smtp_scheduler({"smtp_host": local_host, "allow_local_smtp": "true"}) + # Should not abort at the SSRF guard — will fail at smtplib (connection refused) + # so error log must NOT contain the SSRF rejection message + with patch("smtplib.SMTP", side_effect=ConnectionRefusedError("no server")): + with patch("smtplib.SMTP_SSL", side_effect=ConnectionRefusedError("no server")): + sched.maintenance.send_nightly_email() + logged = str(sched.bot.logger.error.call_args_list) + assert "private" not in logged.lower() and "reserved" not in logged.lower() + + +class TestZombieAlertEmailSsrfGuard: + """send_zombie_alert_email must abort on private IP unless allow_local_smtp=true.""" + + @pytest.mark.parametrize("bad_host", [ + "10.0.0.1", # RFC 1918 10.0.0.0/8 + "172.16.0.1", # RFC 1918 172.16.0.0/12 + "192.168.1.1", # RFC 1918 192.168.0.0/16 + "127.0.0.1", # RFC 1122 127.0.0.0/8 loopback + "169.254.0.1", # RFC 3927 169.254.0.0/16 link-local + "100.64.0.1", # RFC 6598 100.64.0.0/10 shared/CGN + "::1", # RFC 4291 ::1/128 IPv6 loopback + "fd00::1", # RFC 4193 fc00::/7 IPv6 ULA + "fe80::1", # RFC 4291 fe80::/10 IPv6 link-local + ]) + def test_private_smtp_host_aborts_zombie_alert(self, bad_host): + sched = _make_smtp_scheduler({"smtp_host": bad_host}) + sched.send_zombie_alert_email(fail_count=5, threshold=3, interval=60) + sched.bot.logger.error.assert_called() + logged = str(sched.bot.logger.error.call_args_list) + assert "private" in logged.lower() or "reserved" in logged.lower() + + def test_allow_local_smtp_bypasses_ssrf_guard_zombie_alert(self): + """allow_local_smtp=true permits private-IP SMTP for zombie alert.""" + sched = _make_smtp_scheduler({"smtp_host": "127.0.0.1", "allow_local_smtp": "true"}) + with patch("smtplib.SMTP", side_effect=ConnectionRefusedError("no server")): + with patch("smtplib.SMTP_SSL", side_effect=ConnectionRefusedError("no server")): + sched.send_zombie_alert_email(fail_count=5, threshold=3, interval=60) + logged = str(sched.bot.logger.error.call_args_list) + assert "private" not in logged.lower() and "reserved" not in logged.lower() +>>>>>>> 38d040a (security: SSRF hardening, log injection sanitization, and allow_local_smtp) diff --git a/tests/test_security_utils.py b/tests/test_security_utils.py index c212113..0d3a71b 100644 --- a/tests/test_security_utils.py +++ b/tests/test_security_utils.py @@ -76,14 +76,56 @@ class TestValidateExternalUrl: assert validate_external_url("https://example.com/") is True assert validate_external_url("http://example.com/") is True - def test_localhost_rejected_by_default(self): + def test_loopback_rejected_by_default(self): with patch("socket.gethostbyname", return_value="127.0.0.1"): assert validate_external_url("http://localhost/") is False + + def test_rfc1918_rejected_by_default(self): + for ip in ("10.0.0.1", "172.16.0.1", "192.168.1.1"): + with patch("socket.gethostbyname", return_value=ip): + assert validate_external_url("http://example.com/") is False, f"{ip} should be blocked" + + def test_rfc6598_cgn_rejected_by_default(self): + with patch("socket.gethostbyname", return_value="100.64.0.1"): assert validate_external_url("http://example.com/") is False - def test_localhost_allowed_when_requested(self): + # --- allow_loopback: permits 127.x/::1 only, not RFC 1918 --- + + def test_allow_loopback_permits_loopback(self): with patch("socket.gethostbyname", return_value="127.0.0.1"): - assert validate_external_url("http://localhost/", allow_localhost=True) is True + assert validate_external_url("http://localhost/", allow_loopback=True) is True + + def test_allow_loopback_still_blocks_rfc1918(self): + for ip in ("10.0.0.1", "172.16.0.1", "192.168.1.1"): + with patch("socket.gethostbyname", return_value=ip): + assert validate_external_url( + "http://example.com/", allow_loopback=True + ) is False, f"allow_loopback must not permit RFC 1918 addr {ip}" + + def test_allow_loopback_still_blocks_cgn(self): + with patch("socket.gethostbyname", return_value="100.64.0.1"): + assert validate_external_url("http://example.com/", allow_loopback=True) is False + + # --- allow_private: permits all internal ranges including loopback --- + + def test_allow_private_permits_loopback(self): + with patch("socket.gethostbyname", return_value="127.0.0.1"): + assert validate_external_url("http://localhost/", allow_private=True) is True + + def test_allow_private_permits_rfc1918(self): + for ip in ("10.0.0.1", "172.16.0.1", "192.168.1.1"): + with patch("socket.gethostbyname", return_value=ip): + assert validate_external_url( + "http://example.com/", allow_private=True + ) is True, f"allow_private must permit RFC 1918 addr {ip}" + + def test_allow_private_permits_cgn(self): + with patch("socket.gethostbyname", return_value="100.64.0.1"): + assert validate_external_url("http://example.com/", allow_private=True) is True + + def test_allow_private_permits_link_local(self): + with patch("socket.gethostbyname", return_value="169.254.0.1"): + assert validate_external_url("http://example.com/", allow_private=True) is True def test_missing_netloc_rejected(self): assert validate_external_url("http://") is False @@ -230,9 +272,9 @@ class TestValidateExternalUrlExtra: with patch("socket.gethostbyname", side_effect=socket.timeout("timeout")): assert validate_external_url("http://slow.example.com") is False - def test_allow_localhost_permits_loopback(self): + def test_allow_loopback_permits_loopback(self): with patch("socket.gethostbyname", return_value="127.0.0.1"): - result = validate_external_url("http://localhost", allow_localhost=True) + result = validate_external_url("http://localhost", allow_loopback=True) assert result is True @@ -249,3 +291,45 @@ class TestValidateSafePathExtra: with patch("modules.security_utils.Path.resolve", side_effect=OSError("disk fail")): with pytest.raises(ValueError, match="Invalid or unsafe file path"): validate_safe_path("some_file.db", base_dir=str(tmp_path)) + + +class TestSanitizeName: + """Tests for sanitize_name() — log-safe identifier sanitization.""" + + def test_newline_stripped(self): + from modules.security_utils import sanitize_name + assert "\n" not in sanitize_name("Evil\nNode") + + def test_carriage_return_stripped(self): + from modules.security_utils import sanitize_name + assert "\r" not in sanitize_name("Evil\rNode") + + def test_tab_stripped(self): + from modules.security_utils import sanitize_name + assert "\t" not in sanitize_name("Tab\tNode") + + def test_null_byte_stripped(self): + from modules.security_utils import sanitize_name + assert "\x00" not in sanitize_name("Bad\x00Name") + + def test_ansi_escape_stripped(self): + from modules.security_utils import sanitize_name + assert "\x1b" not in sanitize_name("\x1b[31mRed\x1b[0m") + + def test_truncated_to_max_length(self): + from modules.security_utils import sanitize_name + result = sanitize_name("A" * 100, max_length=64) + assert len(result) <= 64 + + def test_normal_name_unchanged(self): + from modules.security_utils import sanitize_name + assert sanitize_name("Alice") == "Alice" + + def test_non_string_coerced(self): + from modules.security_utils import sanitize_name + assert sanitize_name(42) == "42" + + def test_negative_max_length_raises(self): + from modules.security_utils import sanitize_name + with pytest.raises(ValueError): + sanitize_name("test", max_length=-1) diff --git a/tests/test_web_viewer.py b/tests/test_web_viewer.py index 8a6359e..58610ae 100644 --- a/tests/test_web_viewer.py +++ b/tests/test_web_viewer.py @@ -1026,6 +1026,56 @@ class TestConfigNotificationsRoutes: ) assert resp.status_code in (200, 400, 500) + def test_notifications_test_rejects_private_smtp_host(self, client, viewer): + """SSRF guard: RFC 6598 shared/CGN address (100.64.0.0/10) must be rejected.""" + viewer.db_manager.set_metadata('notif.smtp_host', '100.64.0.1') + viewer.db_manager.set_metadata('notif.smtp_port', '587') + viewer.db_manager.set_metadata('notif.smtp_security', 'starttls') + viewer.db_manager.set_metadata('notif.from_email', 'bot@example.com') + viewer.db_manager.set_metadata('notif.recipients', 'admin@example.com') + resp = client.post("/api/config/notifications/test") + assert resp.status_code == 400 + data = resp.get_json() + assert 'private' in data.get('error', '').lower() or 'reserved' in data.get('error', '').lower() + # reset + viewer.db_manager.set_metadata('notif.smtp_host', '') + + def test_notifications_test_rejects_loopback_smtp_host(self, client, viewer): + """SSRF guard: SMTP host of localhost/loopback must be rejected.""" + viewer.db_manager.set_metadata('notif.smtp_host', '127.0.0.1') + viewer.db_manager.set_metadata('notif.smtp_port', '25') + viewer.db_manager.set_metadata('notif.smtp_security', 'none') + viewer.db_manager.set_metadata('notif.from_email', 'bot@example.com') + viewer.db_manager.set_metadata('notif.recipients', 'admin@example.com') + resp = client.post("/api/config/notifications/test") + assert resp.status_code == 400 + data = resp.get_json() + assert 'private' in data.get('error', '').lower() or 'reserved' in data.get('error', '').lower() + # reset + viewer.db_manager.set_metadata('notif.smtp_host', '') + + def test_notifications_test_allows_local_smtp_when_flag_set(self, client, viewer): + """allow_local_smtp=true permits private-IP SMTP hosts (e.g. local Postfix).""" + viewer.db_manager.set_metadata('notif.smtp_host', '127.0.0.1') + viewer.db_manager.set_metadata('notif.smtp_port', '25') + viewer.db_manager.set_metadata('notif.smtp_security', 'none') + viewer.db_manager.set_metadata('notif.from_email', 'bot@example.com') + viewer.db_manager.set_metadata('notif.recipients', 'admin@example.com') + viewer.db_manager.set_metadata('notif.allow_local_smtp', 'true') + resp = client.post("/api/config/notifications/test") + # Must not be rejected with 400 for private address — may be 200 or 500 (send attempt) + assert resp.status_code != 400 or 'private' not in (resp.get_json() or {}).get('error', '').lower() + # reset + viewer.db_manager.set_metadata('notif.smtp_host', '') + viewer.db_manager.set_metadata('notif.allow_local_smtp', '') + + def test_notifications_get_includes_allow_local_smtp(self, client): + """GET /api/config/notifications must return allow_local_smtp field.""" + resp = client.get("/api/config/notifications") + assert resp.status_code == 200 + data = resp.get_json() + assert 'allow_local_smtp' in data + # =========================================================================== # Feed management API @@ -2920,6 +2970,7 @@ class TestDbPathResolutionFromConfigDir: ) +<<<<<<< HEAD class TestRadioDebugConfig: """Tests for GET/POST /api/config/radio-debug endpoints.""" @@ -2989,3 +3040,152 @@ class TestRadioDebugConfig: data = resp.get_json() assert data["success"] is True assert data["op_id"] is None +======= +# =========================================================================== +# Security: Restore endpoint path traversal prevention (GAP W1) +# =========================================================================== + + +class TestRestoreEndpointSecurity: + """POST /api/maintenance/restore must reject path traversal attacks. + + Covers GAP W1: validate_safe_path() added before shutil.copy2(). + Dangerous system directories and traversal patterns must return 400. + """ + + def test_etc_passwd_path_rejected(self, client): + """/etc/passwd supplied as db_file must be blocked.""" + resp = client.post( + "/api/maintenance/restore", + json={"db_file": "/etc/passwd"}, + content_type="application/json", + ) + assert resp.status_code == 400 + data = resp.get_json() + assert "error" in data + + def test_etc_shadow_path_rejected(self, client): + """/etc/shadow must be blocked (credential file).""" + resp = client.post( + "/api/maintenance/restore", + json={"db_file": "/etc/shadow"}, + content_type="application/json", + ) + assert resp.status_code == 400 + + def test_proc_self_environ_rejected(self, client): + """/proc/self/environ must be blocked (process secrets).""" + resp = client.post( + "/api/maintenance/restore", + json={"db_file": "/proc/self/environ"}, + content_type="application/json", + ) + assert resp.status_code == 400 + + def test_empty_db_file_rejected(self, client): + """Empty db_file must return 400.""" + resp = client.post( + "/api/maintenance/restore", + json={"db_file": ""}, + content_type="application/json", + ) + assert resp.status_code == 400 + + def test_missing_db_file_key_rejected(self, client): + """Missing db_file key must return 400.""" + resp = client.post( + "/api/maintenance/restore", + json={}, + content_type="application/json", + ) + assert resp.status_code == 400 + + +# =========================================================================== +# Security: Feed preview/test SSRF prevention (GAP W2) +# =========================================================================== + + +class TestFeedPreviewSecurity: + """POST /api/feeds/preview and /api/feeds/test must reject SSRF-able URLs. + + Covers GAP W2: validate_external_url() added before any outbound fetch. + Private IPs, loopback, and file:// schemes must return 400. + """ + + def test_metadata_endpoint_blocked_in_preview(self, client): + """169.254.169.254 (cloud metadata) must be blocked in feed preview.""" + resp = client.post( + "/api/feeds/preview", + json={"feed_url": "http://169.254.169.254/latest/meta-data/", "feed_type": "rss"}, + content_type="application/json", + ) + assert resp.status_code == 400 + data = resp.get_json() + assert "error" in data + + def test_loopback_blocked_in_preview(self, client): + """127.0.0.1 loopback must be blocked in feed preview.""" + resp = client.post( + "/api/feeds/preview", + json={"feed_url": "http://127.0.0.1/internal", "feed_type": "rss"}, + content_type="application/json", + ) + assert resp.status_code == 400 + + def test_file_scheme_blocked_in_preview(self, client): + """file:// scheme must be blocked in feed preview.""" + resp = client.post( + "/api/feeds/preview", + json={"feed_url": "file:///etc/passwd", "feed_type": "rss"}, + content_type="application/json", + ) + assert resp.status_code == 400 + + def test_private_network_blocked_in_preview(self, client): + """10.x.x.x (private RFC1918) must be blocked in feed preview.""" + resp = client.post( + "/api/feeds/preview", + json={"feed_url": "http://10.0.0.1/feed.xml", "feed_type": "rss"}, + content_type="application/json", + ) + assert resp.status_code == 400 + + def test_missing_feed_url_rejected(self, client): + """Missing feed_url key must return 400.""" + resp = client.post( + "/api/feeds/preview", + json={"feed_type": "rss"}, + content_type="application/json", + ) + assert resp.status_code == 400 + + def test_metadata_endpoint_blocked_in_test_route(self, client): + """169.254.169.254 must be blocked in /api/feeds/test as well.""" + resp = client.post( + "/api/feeds/test", + json={"url": "http://169.254.169.254/metadata"}, + content_type="application/json", + ) + assert resp.status_code == 400 + + def test_loopback_blocked_in_test_route(self, client): + """127.0.0.1 must be blocked in /api/feeds/test.""" + resp = client.post( + "/api/feeds/test", + json={"url": "http://127.0.0.1/internal"}, + content_type="application/json", + ) + assert resp.status_code == 400 + + def test_validate_external_url_called_for_preview(self, client): + """validate_external_url is invoked — not bypassed — in the preview handler.""" + with patch("modules.web_viewer.app.validate_external_url", return_value=False) as mock_veu: + resp = client.post( + "/api/feeds/preview", + json={"feed_url": "http://192.168.1.1/feed.rss", "feed_type": "rss"}, + content_type="application/json", + ) + mock_veu.assert_called() + assert resp.status_code == 400 +>>>>>>> 38d040a (security: SSRF hardening, log injection sanitization, and allow_local_smtp)