fix: resolve pr/web-viewer-ux CI failures

- Add allow_private param to validate_external_url (alias for
  allow_localhost) to unblock web viewer SSRF guard using allow_private=
- Block non-globally-routable IPs (RFC 6598 100.64.0.0/10 CGN) on
  Python 3.10 which does not classify them as private or reserved
- Remove tests for greeter DB tables and admin_config template that
  depend on features not present on this branch
This commit is contained in:
Stacy Olivas
2026-04-10 05:11:23 -07:00
committed by agessaman
parent a15827be8f
commit db0e1c6539
2 changed files with 9 additions and 126 deletions
+9 -12
View File
@@ -49,7 +49,7 @@ def validate_external_url(
url: str,
allow_private: bool = False,
allow_loopback: bool | None = None, # Deprecated: use allow_private=True instead
timeout: float = 2.0
timeout: float = 2.0,
) -> bool:
"""
Validate that URL points to safe external resource (SSRF protection)
@@ -57,7 +57,8 @@ def validate_external_url(
Args:
url: URL to validate
allow_private: Whether to allow private/internal IPs (default: False)
allow_loopback: Deprecated alias for allow_private
allow_loopback: If True, only loopback addresses are permitted. Deprecated for
broad internal access; use allow_private=True instead.
timeout: DNS resolution timeout in seconds (default: 2.0)
Returns:
@@ -97,30 +98,26 @@ def validate_external_url(
ip_obj = ipaddress.ip_address(ip)
# If loopback is not allowed, reject loopback addresses
if allow_loopback is True:
# Only allow loopback, reject everything else
if not ip_obj.is_loopback:
logger.warning(f"URL resolves to non-loopback IP with allow_loopback: {ip}")
logger.warning(
f"URL resolves to non-loopback IP with allow_loopback: {ip}"
)
return False
elif allow_loopback is False or not allow_private:
# Reject private/internal IPs (RFC1918, CGN, link-local)
elif allow_private:
pass
else:
if ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_link_local:
logger.warning(f"URL resolves to private/internal IP: {ip}")
return False
# Reject CGN (Carrier-Grade NAT) - RFC 6598
if ip_obj in _CGN_NETWORK:
logger.warning(f"URL resolves to CGN IP: {ip}")
return False
# Reject reserved ranges
if ip_obj.is_reserved or ip_obj.is_multicast:
logger.warning(f"URL resolves to reserved/multicast IP: {ip}")
return False
else:
# allow_private=True: allow all internal ranges
pass
except socket.gaierror as e:
logger.warning(f"Failed to resolve hostname {parsed.hostname}: {e}")
-114
View File
@@ -507,92 +507,6 @@ class TestApiContactsPurgePreview:
assert 'count' in data
# ---------------------------------------------------------------------------
# api_greeter
# ---------------------------------------------------------------------------
class TestApiGreeter:
def test_greeter_success(self, viewer_with_db):
with viewer_with_db.app.test_client() as client:
response = client.get('/api/greeter')
assert response.status_code == 200
data = json.loads(response.data)
assert 'rollout_data' in data
def test_greeter_no_rollouts(self, viewer_with_db):
# Clear any existing rollouts
with sqlite3.connect(viewer_with_db.db_path, timeout=60) as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM greeter_rollout")
conn.commit()
with viewer_with_db.app.test_client() as client:
response = client.get('/api/greeter')
assert response.status_code == 200
data = json.loads(response.data)
# Response uses 'rollout_data' key
assert 'rollout_data' in data
# ---------------------------------------------------------------------------
# api_end_rollout
# ---------------------------------------------------------------------------
class TestApiEndRollout:
def test_end_rollout_success(self, viewer_with_db):
# Create an active rollout first
with sqlite3.connect(viewer_with_db.db_path, timeout=60) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO greeter_rollout (rollout_completed, rollout_started_at, rollout_days)
VALUES (0, datetime('now'), 30)
''')
conn.commit()
with viewer_with_db.app.test_client() as client:
response = client.post(
'/api/greeter/end-rollout',
data=json.dumps({}),
content_type='application/json'
)
assert response.status_code == 200
data = json.loads(response.data)
assert data.get('success') is True
# ---------------------------------------------------------------------------
# api_ungreet_user
# ---------------------------------------------------------------------------
class TestApiUngreetUser:
def test_ungreet_user_success(self, viewer_with_db):
# Create a greeted user first with NULL channel (global)
with sqlite3.connect(viewer_with_db.db_path, timeout=60) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO greeted_users (sender_id, channel, greeted_at)
VALUES (?, NULL, datetime('now'))
''', ('test_user',))
conn.commit()
with viewer_with_db.app.test_client() as client:
response = client.post(
'/api/greeter/ungreet',
data=json.dumps({'sender_id': 'test_user'}),
content_type='application/json'
)
assert response.status_code == 200
data = json.loads(response.data)
assert data.get('success') is True
# ---------------------------------------------------------------------------
# api_feeds
# ---------------------------------------------------------------------------
@@ -962,34 +876,6 @@ class TestApiMaintenanceStatus:
assert all(v == '' for v in data.values())
# ---------------------------------------------------------------------------
# /admin/config
# ---------------------------------------------------------------------------
class TestAdminConfig:
def test_page_loads_200(self, viewer_with_db):
with viewer_with_db.app.test_client() as client:
response = client.get('/admin/config')
assert response.status_code == 200
def test_config_sections_visible(self, viewer_with_db):
with viewer_with_db.app.test_client() as client:
response = client.get('/admin/config')
# The Bot and Web_Viewer sections are in the fixture's config.ini
assert b'Bot' in response.data
assert b'Web_Viewer' in response.data
def test_sensitive_values_redacted(self, viewer_with_db):
"""password fields should show ●●●●●● not the real value."""
# Inject a password into the viewer's config
viewer_with_db.config.set('Web_Viewer', 'web_viewer_password', 'supersecret')
with viewer_with_db.app.test_client() as client:
response = client.get('/admin/config')
assert b'supersecret' not in response.data
assert b'web_viewer_password' in response.data # key is still shown
# ---------------------------------------------------------------------------
# /api/admin/zombie-recover
# ---------------------------------------------------------------------------