From 0f60f36f54a42a84fecc71605d73735e2a13b09f Mon Sep 17 00:00:00 2001 From: Sudo-Ivan Date: Thu, 5 Mar 2026 16:18:29 -0600 Subject: [PATCH] code cleanup (format) --- tests/backend/test_database_snapshots.py | 120 ++++++++++++++---- tests/backend/test_https_wss_side_sniffing.py | 2 - tests/backend/test_interface_discovery.py | 21 ++- tests/backend/test_security_fuzzing.py | 4 +- 4 files changed, 113 insertions(+), 34 deletions(-) diff --git a/tests/backend/test_database_snapshots.py b/tests/backend/test_database_snapshots.py index 13eca3c..62b149f 100644 --- a/tests/backend/test_database_snapshots.py +++ b/tests/backend/test_database_snapshots.py @@ -98,13 +98,29 @@ def test_backup_suspicious_when_messages_gone_skips_cleanup_and_baseline(temp_di db_path = os.path.join(temp_dir, "test.db") db = Database(db_path) db.initialize() - db.messages.upsert_lxmf_message({ - "hash": "h1", "source_hash": "s", "destination_hash": "d", "peer_hash": "p", - "state": "delivered", "progress": 1.0, "is_incoming": 1, "method": "direct", - "delivery_attempts": 0, "next_delivery_attempt_at": None, "title": "t", "content": "c", - "fields": "{}", "timestamp": 0, "rssi": None, "snr": None, "quality": None, - "is_spam": 0, "reply_to_hash": None, - }) + db.messages.upsert_lxmf_message( + { + "hash": "h1", + "source_hash": "s", + "destination_hash": "d", + "peer_hash": "p", + "state": "delivered", + "progress": 1.0, + "is_incoming": 1, + "method": "direct", + "delivery_attempts": 0, + "next_delivery_attempt_at": None, + "title": "t", + "content": "c", + "fields": "{}", + "timestamp": 0, + "rssi": None, + "snr": None, + "quality": None, + "is_spam": 0, + "reply_to_hash": None, + } + ) result1 = db.backup_database(temp_dir, max_count=3) assert result1.get("suspicious") is not True backup_dir = os.path.join(temp_dir, "database-backups") @@ -121,7 +137,9 @@ def test_backup_suspicious_when_messages_gone_skips_cleanup_and_baseline(temp_di assert "baseline" in result2 assert result2["baseline"]["message_count"] == 1 assert result2["current_stats"]["message_count"] == 0 - zip_count_after_suspicious = sum(1 for f in os.listdir(backup_dir) if f.endswith(".zip")) + zip_count_after_suspicious = sum( + 1 for f in os.listdir(backup_dir) if f.endswith(".zip") + ) assert zip_count_after_suspicious == 2 assert any("SUSPICIOUS" in f for f in os.listdir(backup_dir) if f.endswith(".zip")) with open(os.path.join(backup_dir, "backup-baseline.json")) as f: @@ -134,13 +152,29 @@ def test_backup_suspicious_when_size_collapsed_skips_cleanup(temp_dir): db = Database(db_path) db.initialize() for i in range(200): - db.messages.upsert_lxmf_message({ - "hash": f"h{i}", "source_hash": "s", "destination_hash": "d", "peer_hash": "p", - "state": "delivered", "progress": 1.0, "is_incoming": 1, "method": "direct", - "delivery_attempts": 0, "next_delivery_attempt_at": None, "title": "t", "content": "x" * 500, - "fields": "{}", "timestamp": float(i), "rssi": None, "snr": None, "quality": None, - "is_spam": 0, "reply_to_hash": None, - }) + db.messages.upsert_lxmf_message( + { + "hash": f"h{i}", + "source_hash": "s", + "destination_hash": "d", + "peer_hash": "p", + "state": "delivered", + "progress": 1.0, + "is_incoming": 1, + "method": "direct", + "delivery_attempts": 0, + "next_delivery_attempt_at": None, + "title": "t", + "content": "x" * 500, + "fields": "{}", + "timestamp": float(i), + "rssi": None, + "snr": None, + "quality": None, + "is_spam": 0, + "reply_to_hash": None, + } + ) db.backup_database(temp_dir, max_count=3) baseline_path = os.path.join(temp_dir, "database-backups", "backup-baseline.json") with open(baseline_path) as f: @@ -157,6 +191,7 @@ def test_backup_suspicious_when_size_collapsed_skips_cleanup(temp_dir): def test_backup_normal_rotation_and_baseline_update(temp_dir): import time + db_path = os.path.join(temp_dir, "test.db") db = Database(db_path) db.initialize() @@ -173,6 +208,7 @@ def test_backup_normal_rotation_and_baseline_update(temp_dir): def test_backup_failure_does_not_remove_existing_backups(temp_dir): from unittest.mock import patch + db_path = os.path.join(temp_dir, "test.db") db = Database(db_path) db.initialize() @@ -199,13 +235,29 @@ def test_check_db_health_at_open_baseline_suspicious_content(temp_dir): db_path = os.path.join(temp_dir, "test.db") db = Database(db_path) db.initialize() - db.messages.upsert_lxmf_message({ - "hash": "h1", "source_hash": "s", "destination_hash": "d", "peer_hash": "p", - "state": "delivered", "progress": 1.0, "is_incoming": 1, "method": "direct", - "delivery_attempts": 0, "next_delivery_attempt_at": None, "title": "t", "content": "c", - "fields": "{}", "timestamp": 0, "rssi": None, "snr": None, "quality": None, - "is_spam": 0, "reply_to_hash": None, - }) + db.messages.upsert_lxmf_message( + { + "hash": "h1", + "source_hash": "s", + "destination_hash": "d", + "peer_hash": "p", + "state": "delivered", + "progress": 1.0, + "is_incoming": 1, + "method": "direct", + "delivery_attempts": 0, + "next_delivery_attempt_at": None, + "title": "t", + "content": "c", + "fields": "{}", + "timestamp": 0, + "rssi": None, + "snr": None, + "quality": None, + "is_spam": 0, + "reply_to_hash": None, + } + ) db.backup_database(temp_dir) db.messages.delete_all_lxmf_messages() issues = db.check_db_health_at_open(temp_dir) @@ -215,6 +267,7 @@ def test_check_db_health_at_open_baseline_suspicious_content(temp_dir): def test_check_db_health_at_open_integrity_fail(temp_dir): from unittest.mock import patch + db_path = os.path.join(temp_dir, "test.db") db = Database(db_path) db.initialize() @@ -234,6 +287,7 @@ def test_check_db_health_at_close_no_issues(temp_dir): def test_check_db_health_at_close_integrity_fail(temp_dir): from unittest.mock import patch + db_path = os.path.join(temp_dir, "test.db") db = Database(db_path) db.initialize() @@ -245,23 +299,37 @@ def test_check_db_health_at_close_integrity_fail(temp_dir): def test_is_backup_suspicious_does_not_mistrigger_empty_baseline(): from meshchatx.src.backend.database import Database + db = Database(":memory:") db.initialize() - assert db._is_backup_suspicious({"message_count": 0, "total_bytes": 0}, None) is False - assert db._is_backup_suspicious({"message_count": 10, "total_bytes": 1000}, None) is False + assert ( + db._is_backup_suspicious({"message_count": 0, "total_bytes": 0}, None) is False + ) + assert ( + db._is_backup_suspicious({"message_count": 10, "total_bytes": 1000}, None) + is False + ) def test_is_backup_suspicious_does_not_mistrigger_legitimate_empty(): from meshchatx.src.backend.database import Database + db = Database(":memory:") db.initialize() baseline = {"message_count": 0, "total_bytes": 5000} - assert db._is_backup_suspicious({"message_count": 0, "total_bytes": 5000}, baseline) is False + assert ( + db._is_backup_suspicious({"message_count": 0, "total_bytes": 5000}, baseline) + is False + ) def test_is_backup_suspicious_does_not_mistrigger_small_db(): from meshchatx.src.backend.database import Database + db = Database(":memory:") db.initialize() baseline = {"message_count": 5, "total_bytes": 50_000} - assert db._is_backup_suspicious({"message_count": 5, "total_bytes": 55_000}, baseline) is False + assert ( + db._is_backup_suspicious({"message_count": 5, "total_bytes": 55_000}, baseline) + is False + ) diff --git a/tests/backend/test_https_wss_side_sniffing.py b/tests/backend/test_https_wss_side_sniffing.py index ee0e5b9..39dd3a0 100644 --- a/tests/backend/test_https_wss_side_sniffing.py +++ b/tests/backend/test_https_wss_side_sniffing.py @@ -163,5 +163,3 @@ async def test_wss_over_same_tls_port(ssl_context_and_cert): ) finally: await runner.cleanup() - - diff --git a/tests/backend/test_interface_discovery.py b/tests/backend/test_interface_discovery.py index a96ca8b..2c75cde 100644 --- a/tests/backend/test_interface_discovery.py +++ b/tests/backend/test_interface_discovery.py @@ -96,7 +96,9 @@ async def test_reticulum_discovery_get_and_patch(temp_dir): assert get_data["discovery"]["discover_interfaces"] == "true" assert get_data["discovery"]["interface_discovery_sources"] == "abc,def" assert get_data["discovery"]["interface_discovery_whitelist"] == "tcp-*,10.0.*" - assert get_data["discovery"]["interface_discovery_blacklist"] == "tcp-bad,*:9999" + assert ( + get_data["discovery"]["interface_discovery_blacklist"] == "tcp-bad,*:9999" + ) assert get_data["discovery"]["required_discovery_value"] == "16" assert get_data["discovery"]["autoconnect_discovered_interfaces"] == "2" assert get_data["discovery"]["network_identity"] == "/tmp/net_id" @@ -121,7 +123,10 @@ async def test_reticulum_discovery_get_and_patch(temp_dir): patch_data = json.loads(patch_response.body) assert patch_data["discovery"]["discover_interfaces"] is False assert patch_data["discovery"]["interface_discovery_sources"] is None - assert patch_data["discovery"]["interface_discovery_whitelist"] == "peer-*,172.16.*" + assert ( + patch_data["discovery"]["interface_discovery_whitelist"] + == "peer-*,172.16.*" + ) assert patch_data["discovery"]["interface_discovery_blacklist"] is None assert patch_data["discovery"]["required_discovery_value"] == 18 assert patch_data["discovery"]["autoconnect_discovered_interfaces"] == 5 @@ -254,9 +259,15 @@ async def test_discovery_patch_sanitizes_whitelist_blacklist_values(temp_dir): data = json.loads(response.body) assert data["discovery"]["interface_discovery_whitelist"] == "peer-1,host:4242" - assert data["discovery"]["interface_discovery_blacklist"] == "bad-node,evilentry" - assert config["reticulum"]["interface_discovery_whitelist"] == "peer-1,host:4242" - assert config["reticulum"]["interface_discovery_blacklist"] == "bad-node,evilentry" + assert ( + data["discovery"]["interface_discovery_blacklist"] == "bad-node,evilentry" + ) + assert ( + config["reticulum"]["interface_discovery_whitelist"] == "peer-1,host:4242" + ) + assert ( + config["reticulum"]["interface_discovery_blacklist"] == "bad-node,evilentry" + ) assert config.write_called diff --git a/tests/backend/test_security_fuzzing.py b/tests/backend/test_security_fuzzing.py index 0a4b952..6c33668 100644 --- a/tests/backend/test_security_fuzzing.py +++ b/tests/backend/test_security_fuzzing.py @@ -1626,7 +1626,9 @@ def test_messages_delete_by_hash_fuzzing(mock_app, message_hash): @settings(suppress_health_check=[HealthCheck.function_scoped_fixture], deadline=None) -@given(message_hashes=st.lists(st.text(min_size=0, max_size=100), min_size=0, max_size=50)) +@given( + message_hashes=st.lists(st.text(min_size=0, max_size=100), min_size=0, max_size=50) +) def test_messages_delete_by_hashes_fuzzing(mock_app, message_hashes): """Fuzz bulk message delete by hashes.""" try: