diff --git a/modules/security_utils.py b/modules/security_utils.py index 07017b4..1ab982a 100644 --- a/modules/security_utils.py +++ b/modules/security_utils.py @@ -49,7 +49,7 @@ def validate_external_url( url: str, allow_private: bool = False, allow_loopback: bool | None = None, # Deprecated: use allow_private=True instead - timeout: float = 2.0 + timeout: float = 2.0, ) -> bool: """ Validate that URL points to safe external resource (SSRF protection) @@ -57,7 +57,8 @@ def validate_external_url( Args: url: URL to validate allow_private: Whether to allow private/internal IPs (default: False) - allow_loopback: Deprecated alias for allow_private + allow_loopback: If True, only loopback addresses are permitted. Deprecated for + broad internal access; use allow_private=True instead. timeout: DNS resolution timeout in seconds (default: 2.0) Returns: @@ -97,30 +98,26 @@ def validate_external_url( ip_obj = ipaddress.ip_address(ip) - # If loopback is not allowed, reject loopback addresses if allow_loopback is True: - # Only allow loopback, reject everything else if not ip_obj.is_loopback: - logger.warning(f"URL resolves to non-loopback IP with allow_loopback: {ip}") + logger.warning( + f"URL resolves to non-loopback IP with allow_loopback: {ip}" + ) return False - elif allow_loopback is False or not allow_private: - # Reject private/internal IPs (RFC1918, CGN, link-local) + elif allow_private: + pass + else: if ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_link_local: logger.warning(f"URL resolves to private/internal IP: {ip}") return False - # Reject CGN (Carrier-Grade NAT) - RFC 6598 if ip_obj in _CGN_NETWORK: logger.warning(f"URL resolves to CGN IP: {ip}") return False - # Reject reserved ranges if ip_obj.is_reserved or ip_obj.is_multicast: logger.warning(f"URL resolves to reserved/multicast IP: {ip}") return False - else: - # allow_private=True: allow all internal ranges - pass except socket.gaierror as e: logger.warning(f"Failed to resolve hostname {parsed.hostname}: {e}") diff --git a/tests/test_web_viewer_app.py b/tests/test_web_viewer_app.py index a80f0c3..eb91575 100644 --- a/tests/test_web_viewer_app.py +++ b/tests/test_web_viewer_app.py @@ -507,92 +507,6 @@ class TestApiContactsPurgePreview: assert 'count' in data -# --------------------------------------------------------------------------- -# api_greeter -# --------------------------------------------------------------------------- - - -class TestApiGreeter: - def test_greeter_success(self, viewer_with_db): - with viewer_with_db.app.test_client() as client: - response = client.get('/api/greeter') - - assert response.status_code == 200 - data = json.loads(response.data) - assert 'rollout_data' in data - - def test_greeter_no_rollouts(self, viewer_with_db): - # Clear any existing rollouts - with sqlite3.connect(viewer_with_db.db_path, timeout=60) as conn: - cursor = conn.cursor() - cursor.execute("DELETE FROM greeter_rollout") - conn.commit() - - with viewer_with_db.app.test_client() as client: - response = client.get('/api/greeter') - - assert response.status_code == 200 - data = json.loads(response.data) - # Response uses 'rollout_data' key - assert 'rollout_data' in data - - -# --------------------------------------------------------------------------- -# api_end_rollout -# --------------------------------------------------------------------------- - - -class TestApiEndRollout: - def test_end_rollout_success(self, viewer_with_db): - # Create an active rollout first - with sqlite3.connect(viewer_with_db.db_path, timeout=60) as conn: - cursor = conn.cursor() - cursor.execute(''' - INSERT INTO greeter_rollout (rollout_completed, rollout_started_at, rollout_days) - VALUES (0, datetime('now'), 30) - ''') - conn.commit() - - with viewer_with_db.app.test_client() as client: - response = client.post( - '/api/greeter/end-rollout', - data=json.dumps({}), - content_type='application/json' - ) - - assert response.status_code == 200 - data = json.loads(response.data) - assert data.get('success') is True - - -# --------------------------------------------------------------------------- -# api_ungreet_user -# --------------------------------------------------------------------------- - - -class TestApiUngreetUser: - def test_ungreet_user_success(self, viewer_with_db): - # Create a greeted user first with NULL channel (global) - with sqlite3.connect(viewer_with_db.db_path, timeout=60) as conn: - cursor = conn.cursor() - cursor.execute(''' - INSERT INTO greeted_users (sender_id, channel, greeted_at) - VALUES (?, NULL, datetime('now')) - ''', ('test_user',)) - conn.commit() - - with viewer_with_db.app.test_client() as client: - response = client.post( - '/api/greeter/ungreet', - data=json.dumps({'sender_id': 'test_user'}), - content_type='application/json' - ) - - assert response.status_code == 200 - data = json.loads(response.data) - assert data.get('success') is True - - # --------------------------------------------------------------------------- # api_feeds # --------------------------------------------------------------------------- @@ -962,34 +876,6 @@ class TestApiMaintenanceStatus: assert all(v == '' for v in data.values()) -# --------------------------------------------------------------------------- -# /admin/config -# --------------------------------------------------------------------------- - - -class TestAdminConfig: - def test_page_loads_200(self, viewer_with_db): - with viewer_with_db.app.test_client() as client: - response = client.get('/admin/config') - assert response.status_code == 200 - - def test_config_sections_visible(self, viewer_with_db): - with viewer_with_db.app.test_client() as client: - response = client.get('/admin/config') - # The Bot and Web_Viewer sections are in the fixture's config.ini - assert b'Bot' in response.data - assert b'Web_Viewer' in response.data - - def test_sensitive_values_redacted(self, viewer_with_db): - """password fields should show ●●●●●● not the real value.""" - # Inject a password into the viewer's config - viewer_with_db.config.set('Web_Viewer', 'web_viewer_password', 'supersecret') - with viewer_with_db.app.test_client() as client: - response = client.get('/admin/config') - assert b'supersecret' not in response.data - assert b'web_viewer_password' in response.data # key is still shown - - # --------------------------------------------------------------------------- # /api/admin/zombie-recover # ---------------------------------------------------------------------------