From 2b6616f162846ea188ae940f2e1aead5ca10ffaf Mon Sep 17 00:00:00 2001 From: Sudo-Ivan Date: Fri, 6 Mar 2026 11:36:24 -0600 Subject: [PATCH] Add extensive tests for CrashRecovery and IntegrityManager --- tests/backend/test_crash_recovery.py | 207 +++++++++++++++++++ tests/backend/test_integrity_extensive.py | 239 ++++++++++++++++++++++ tests/backend/test_property_based.py | 98 +++++++++ 3 files changed, 544 insertions(+) diff --git a/tests/backend/test_crash_recovery.py b/tests/backend/test_crash_recovery.py index b8a4455..a1c50d6 100644 --- a/tests/backend/test_crash_recovery.py +++ b/tests/backend/test_crash_recovery.py @@ -321,6 +321,213 @@ class TestCrashRecovery(unittest.TestCase): self.assertGreaterEqual(legacy_cause["probability"], 80) self.assertIn("Kernel detected: 3.10", legacy_cause["reasoning"]) + # ================================================================== + # install() / disable() + # ================================================================== + + def test_install_sets_excepthook(self): + """install() should set sys.excepthook to handle_exception.""" + original = sys.excepthook + try: + self.recovery.install() + self.assertEqual(sys.excepthook, self.recovery.handle_exception) + finally: + sys.excepthook = original + + def test_disable_prevents_install(self): + """After disable(), install() should not set the hook.""" + original = sys.excepthook + try: + self.recovery.disable() + self.recovery.install() + self.assertNotEqual(sys.excepthook, self.recovery.handle_exception) + finally: + sys.excepthook = original + + # ================================================================== + # _calculate_manifold_curvature + # ================================================================== + + def test_curvature_empty_causes(self): + """Empty causes list should return 0.0 curvature.""" + self.assertEqual(self.recovery._calculate_manifold_curvature([]), 0.0) + + def test_curvature_single_cause(self): + """Single cause should return probability * 10.""" + causes = [{"probability": 95}] + self.assertAlmostEqual(self.recovery._calculate_manifold_curvature(causes), 9.5) + + def test_curvature_two_causes_gradient(self): + """Curvature should be 10 * (top - second).""" + causes = [{"probability": 90}, {"probability": 40}] + self.assertAlmostEqual(self.recovery._calculate_manifold_curvature(causes), 5.0) + + def test_curvature_equal_causes(self): + """Equal probabilities should give curvature = 0.""" + causes = [{"probability": 50}, {"probability": 50}] + self.assertAlmostEqual(self.recovery._calculate_manifold_curvature(causes), 0.0) + + def test_curvature_many_causes(self): + """Only top 2 probabilities matter for curvature.""" + causes = [ + {"probability": 80}, + {"probability": 30}, + {"probability": 10}, + ] + self.assertAlmostEqual(self.recovery._calculate_manifold_curvature(causes), 5.0) + + # ================================================================== + # _calculate_system_entropy edge cases + # ================================================================== + + def test_entropy_empty_diagnosis(self): + """Empty diagnosis should return baseline entropy (all ideal).""" + entropy, divergence = self.recovery._calculate_system_entropy({}) + self.assertGreater(entropy, 0) + self.assertAlmostEqual(divergence, 0.0, places=5) + + def test_entropy_all_critical(self): + """All dimensions critical should maximize entropy and divergence.""" + diag = { + "low_memory": True, + "config_missing": True, + "db_type": "memory", + } + entropy, divergence = self.recovery._calculate_system_entropy(diag) + base_entropy, base_div = self.recovery._calculate_system_entropy({}) + self.assertGreater(entropy, base_entropy) + self.assertGreater(divergence, base_div) + + def test_entropy_invalid_mem_value(self): + """Non-numeric available_mem_mb should not crash.""" + diag = {"available_mem_mb": "not_a_number", "low_memory": True} + entropy, divergence = self.recovery._calculate_system_entropy(diag) + self.assertIsInstance(entropy, float) + self.assertIsInstance(divergence, float) + + def test_entropy_none_mem_value(self): + """None available_mem_mb should not crash.""" + diag = {"available_mem_mb": None} + entropy, divergence = self.recovery._calculate_system_entropy(diag) + self.assertIsInstance(entropy, float) + + def test_divergence_nonnegative(self): + """KL divergence must always be non-negative.""" + test_cases = [ + {}, + {"low_memory": True}, + {"config_missing": True}, + {"db_type": "memory"}, + {"low_memory": True, "config_missing": True, "db_type": "memory"}, + ] + for diag in test_cases: + _, div = self.recovery._calculate_system_entropy(diag) + self.assertGreaterEqual(div, 0.0, f"Negative divergence for {diag}") + + # ================================================================== + # legacy_kernel regex safety + # ================================================================== + + def test_legacy_kernel_non_linux(self): + """Non-Linux platforms should not crash on legacy_kernel check.""" + with ( + patch("platform.system", return_value="Windows"), + patch("platform.release", return_value="10"), + ): + exc_type = RuntimeError + exc_value = RuntimeError("test") + causes = self.recovery._analyze_cause(exc_type, exc_value, {}) + self.assertIsInstance(causes, list) + + def test_legacy_kernel_unusual_release(self): + """Unusual release strings should not crash.""" + with ( + patch("platform.system", return_value="Linux"), + patch("platform.release", return_value="unknown"), + ): + exc_type = RuntimeError + exc_value = RuntimeError("test") + causes = self.recovery._analyze_cause(exc_type, exc_value, {}) + self.assertIsInstance(causes, list) + + def test_legacy_kernel_empty_release(self): + """Empty release string should not crash.""" + with ( + patch("platform.system", return_value="Linux"), + patch("platform.release", return_value=""), + ): + exc_type = RuntimeError + exc_value = RuntimeError("test") + causes = self.recovery._analyze_cause(exc_type, exc_value, {}) + self.assertIsInstance(causes, list) + + # ================================================================== + # run_reticulum_diagnosis isolation + # ================================================================== + + def test_reticulum_diagnosis_invalid_config_content(self): + """Invalid config file content should be flagged.""" + rns_dir = os.path.join(self.test_dir, "rns_invalid") + os.makedirs(rns_dir) + with open(os.path.join(rns_dir, "config"), "w") as f: + f.write("this is not a valid reticulum config") + + self.recovery.update_paths(reticulum_config_dir=rns_dir) + output = io.StringIO() + results = self.recovery.run_reticulum_diagnosis(file=output) + report = output.getvalue() + self.assertIn("[ERROR]", report) + self.assertTrue(results.get("config_invalid", False)) + + def test_reticulum_diagnosis_empty_logfile(self): + """Empty log file should be handled gracefully.""" + rns_dir = os.path.join(self.test_dir, "rns_empty_log") + os.makedirs(rns_dir) + with open(os.path.join(rns_dir, "config"), "w") as f: + f.write("[reticulum]\n") + with open(os.path.join(rns_dir, "logfile"), "w") as f: + pass + + self.recovery.update_paths(reticulum_config_dir=rns_dir) + output = io.StringIO() + self.recovery.run_reticulum_diagnosis(file=output) + report = output.getvalue() + self.assertIn("Log file is empty", report) + + # ================================================================== + # Diagnosis edge cases + # ================================================================== + + def test_diagnosis_empty_db_file(self): + """0-byte database file should trigger a warning.""" + open(self.db_path, "w").close() # noqa: SIM115 + + output = io.StringIO() + self.recovery.run_diagnosis(file=output) + report = output.getvalue() + self.assertIn("empty (0 bytes)", report) + + def test_update_paths(self): + """update_paths should correctly update internal state.""" + new_storage = os.path.join(self.test_dir, "new_storage") + self.recovery.update_paths(storage_dir=new_storage) + self.assertEqual(self.recovery.storage_dir, new_storage) + + def test_keyboard_interrupt_passthrough(self): + """KeyboardInterrupt should be passed to the default hook.""" + called = {} + + def mock_hook(exc_type, exc_value, exc_tb): + called["invoked"] = True + + original = sys.__excepthook__ + sys.__excepthook__ = mock_hook + try: + self.recovery.handle_exception(KeyboardInterrupt, KeyboardInterrupt(), None) + self.assertTrue(called.get("invoked", False)) + finally: + sys.__excepthook__ = original + if __name__ == "__main__": unittest.main() diff --git a/tests/backend/test_integrity_extensive.py b/tests/backend/test_integrity_extensive.py index 14202a4..3038bb1 100644 --- a/tests/backend/test_integrity_extensive.py +++ b/tests/backend/test_integrity_extensive.py @@ -183,6 +183,245 @@ class TestIntegrityManagerExtensive(unittest.TestCase): if not is_ok: self.assertTrue(any("Database" in i for i in issues)) + # ------------------------------------------------------------------ + # Corrupt / malformed manifest + # ------------------------------------------------------------------ + + def test_corrupt_manifest_json(self): + """check_integrity must not crash on invalid JSON in the manifest.""" + self.manager.save_manifest() + with open(self.manager.manifest_path, "w") as f: + f.write("{{{NOT JSON!!!") + + is_ok, issues = self.manager.check_integrity() + self.assertFalse(is_ok) + self.assertTrue(any("Integrity check failed" in i for i in issues)) + + def test_empty_manifest_file(self): + """check_integrity must handle a 0-byte manifest gracefully.""" + self.manager.save_manifest() + with open(self.manager.manifest_path, "w") as f: + f.truncate(0) + + is_ok, issues = self.manager.check_integrity() + self.assertFalse(is_ok) + self.assertTrue(any("Integrity check failed" in i for i in issues)) + + def test_manifest_missing_keys(self): + """Manifest with valid JSON but missing expected keys should not crash.""" + with open(self.manager.manifest_path, "w") as f: + json.dump({"version": 2}, f) + + is_ok, issues = self.manager.check_integrity() + self.assertTrue(is_ok or isinstance(issues, list)) + + # ------------------------------------------------------------------ + # Hash consistency + # ------------------------------------------------------------------ + + def test_hash_file_consistency(self): + """Same file content must always produce the same hash.""" + h1 = self.manager._hash_file(self.db_path) + h2 = self.manager._hash_file(self.db_path) + self.assertEqual(h1, h2) + self.assertIsNotNone(h1) + self.assertEqual(len(h1), 64) # SHA-256 hex length + + def test_hash_file_missing(self): + """_hash_file on a missing path must return None.""" + result = self.manager._hash_file(self.test_dir / "does_not_exist.bin") + self.assertIsNone(result) + + # ------------------------------------------------------------------ + # DB integrity check + # ------------------------------------------------------------------ + + def test_check_db_integrity_valid(self): + """_check_db_integrity on a valid DB returns (True, 'ok').""" + ok, msg = self.manager._check_db_integrity(self.db_path) + self.assertTrue(ok) + self.assertEqual(msg, "ok") + + def test_check_db_integrity_not_sqlite(self): + """_check_db_integrity on a non-SQLite file returns (False, ...).""" + bad = self.test_dir / "bad.db" + bad.write_text("this is not sqlite") + ok, msg = self.manager._check_db_integrity(bad) + self.assertFalse(ok) + + def test_check_db_integrity_missing(self): + """_check_db_integrity on missing file returns (False, ...).""" + ok, msg = self.manager._check_db_integrity(self.test_dir / "gone.db") + self.assertFalse(ok) + self.assertIn("does not exist", msg) + + def test_check_db_integrity_empty_file(self): + """_check_db_integrity on a 0-byte file should not crash.""" + empty_db = self.test_dir / "empty.db" + empty_db.touch() + ok, msg = self.manager._check_db_integrity(empty_db) + # SQLite treats a 0-byte file as a valid empty database + self.assertIsInstance(ok, bool) + self.assertIsInstance(msg, str) + + # ------------------------------------------------------------------ + # Entropy threshold boundaries + # ------------------------------------------------------------------ + + def test_entropy_threshold_db_just_below(self): + """DB entropy delta of 0.99 should NOT trigger anomaly warning.""" + self.manager.save_manifest() + + with open(self.manager.manifest_path) as f: + manifest = json.load(f) + + db_rel = str(self.db_path.relative_to(self.test_dir)) + real_entropy = manifest["metadata"][db_rel]["entropy"] + manifest["metadata"][db_rel]["entropy"] = real_entropy + 0.99 + + with open(self.manager.manifest_path, "w") as f: + json.dump(manifest, f) + + is_ok, issues = self.manager.check_integrity() + self.assertFalse( + any("structural anomaly" in i for i in issues), + f"Should not flag anomaly at delta=0.99: {issues}", + ) + + def test_entropy_threshold_db_just_above(self): + """DB entropy delta of 1.01 should trigger anomaly warning.""" + self.manager.save_manifest() + + with open(self.manager.manifest_path) as f: + manifest = json.load(f) + + db_rel = str(self.db_path.relative_to(self.test_dir)) + real_entropy = manifest["metadata"][db_rel]["entropy"] + manifest["metadata"][db_rel]["entropy"] = real_entropy + 1.01 + + # Also change the file hash so it triggers the comparison path + manifest["files"][db_rel] = "0" * 64 + + with open(self.manager.manifest_path, "w") as f: + json.dump(manifest, f) + + is_ok, issues = self.manager.check_integrity() + self.assertFalse(is_ok) + self.assertTrue( + any("structural anomaly" in i or "Entropy" in i for i in issues), + f"Should flag anomaly at delta=1.01: {issues}", + ) + + def test_entropy_threshold_file_1_49_no_flag(self): + """Non-DB file entropy delta of 1.49 should NOT trigger content shift.""" + data_file = self.test_dir / "payload.bin" + data_file.write_bytes(b"X" * 2000) + self.manager.save_manifest() + + with open(self.manager.manifest_path) as f: + manifest = json.load(f) + + rel = str(data_file.relative_to(self.test_dir)) + real_entropy = manifest["metadata"][rel]["entropy"] + manifest["metadata"][rel]["entropy"] = real_entropy + 1.49 + manifest["files"][rel] = "0" * 64 + + with open(self.manager.manifest_path, "w") as f: + json.dump(manifest, f) + + is_ok, issues = self.manager.check_integrity() + self.assertFalse( + any("Non-linear content shift" in i for i in issues), + f"Should not flag content shift at delta=1.49: {issues}", + ) + + def test_entropy_threshold_file_1_51_flags(self): + """Non-DB file entropy delta of 1.51 should trigger content shift.""" + data_file = self.test_dir / "payload.bin" + data_file.write_bytes(b"X" * 2000) + self.manager.save_manifest() + + with open(self.manager.manifest_path) as f: + manifest = json.load(f) + + rel = str(data_file.relative_to(self.test_dir)) + real_entropy = manifest["metadata"][rel]["entropy"] + manifest["metadata"][rel]["entropy"] = real_entropy + 1.51 + manifest["files"][rel] = "0" * 64 + + with open(self.manager.manifest_path, "w") as f: + json.dump(manifest, f) + + is_ok, issues = self.manager.check_integrity() + self.assertFalse(is_ok) + self.assertTrue( + any("Non-linear content shift" in i for i in issues), + f"Should flag content shift at delta=1.51: {issues}", + ) + + # ------------------------------------------------------------------ + # DB outside storage_dir + # ------------------------------------------------------------------ + + def test_db_outside_storage_dir(self): + """check_integrity must not crash when DB is outside storage_dir.""" + import tempfile as tf + + ext_dir = Path(tf.mkdtemp()) + try: + ext_db = ext_dir / "external.db" + conn = sqlite3.connect(ext_db) + conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY)") + conn.close() + + mgr = IntegrityManager(self.test_dir, ext_db) + mgr.save_manifest() + is_ok, issues = mgr.check_integrity() + self.assertTrue(is_ok or isinstance(issues, list)) + finally: + shutil.rmtree(ext_dir) + + # ------------------------------------------------------------------ + # Hypothesis: entropy for any binary data + # ------------------------------------------------------------------ + + @settings( + suppress_health_check=[HealthCheck.too_slow], + deadline=None, + derandomize=True, + ) + @given(st.binary(min_size=1, max_size=4096)) + def test_entropy_monotonic_with_unique_bytes(self, data): + """More unique byte values should produce higher entropy.""" + f = self.test_dir / "hyp_ent" + f.write_bytes(data) + e = self.manager._calculate_entropy(f) + unique = len(set(data)) + if unique == 1: + self.assertAlmostEqual(e, 0.0, places=5) + else: + self.assertGreater(e, 0.0) + self.assertLessEqual(e, 8.0 + 1e-9) + + # ------------------------------------------------------------------ + # Hypothesis: save_manifest then check_integrity always consistent + # ------------------------------------------------------------------ + + @settings( + suppress_health_check=[HealthCheck.too_slow], + deadline=None, + max_examples=10, + derandomize=True, + ) + @given(st.binary(min_size=1, max_size=512)) + def test_save_then_check_always_passes(self, extra_data): + """After save_manifest(), check_integrity() must pass for unchanged state.""" + extra_file = self.test_dir / "extra.bin" + extra_file.write_bytes(extra_data) + self.manager.save_manifest() + is_ok, issues = self.manager.check_integrity() + self.assertTrue(is_ok, f"Should pass after save. Issues: {issues}") + if __name__ == "__main__": unittest.main() diff --git a/tests/backend/test_property_based.py b/tests/backend/test_property_based.py index be9fc96..1f472c0 100644 --- a/tests/backend/test_property_based.py +++ b/tests/backend/test_property_based.py @@ -947,3 +947,101 @@ def test_is_backup_suspicious_property(prev_count, prev_bytes, curr_count, curr_ result = db._is_backup_suspicious(current_stats, baseline) expected = _is_backup_suspicious_reference(current_stats, baseline) assert result == expected + + +# ===================================================================== +# CrashRecovery math property-based tests +# ===================================================================== + + +class TestCrashRecoveryMathProperties: + """Property-based tests for CrashRecovery mathematical functions.""" + + @staticmethod + def _make_recovery(): + return CrashRecovery() + + @given( + low_memory=st.booleans(), + config_missing=st.booleans(), + config_invalid=st.booleans(), + db_type=st.sampled_from(["file", "memory"]), + available_mem_mb=st.one_of( + st.floats(min_value=0, max_value=100000, allow_nan=False), + st.integers(min_value=0, max_value=100000), + st.none(), + st.just("garbage"), + ), + ) + @settings( + suppress_health_check=[HealthCheck.too_slow], + deadline=None, + derandomize=True, + ) + def test_system_entropy_always_finite( + self, low_memory, config_missing, config_invalid, db_type, available_mem_mb + ): + """Entropy and divergence must always be finite floats for any diagnosis.""" + import math as m + + recovery = self._make_recovery() + diag = { + "low_memory": low_memory, + "config_missing": config_missing, + "config_invalid": config_invalid, + "db_type": db_type, + "available_mem_mb": available_mem_mb, + } + entropy, divergence = recovery._calculate_system_entropy(diag) + assert m.isfinite(entropy), f"Non-finite entropy: {entropy}" + assert m.isfinite(divergence), f"Non-finite divergence: {divergence}" + assert entropy >= 0, f"Negative entropy: {entropy}" + assert divergence >= 0, f"Negative divergence: {divergence}" + + @given( + probs=st.lists( + st.integers(min_value=0, max_value=100), min_size=0, max_size=10 + ), + ) + @settings(derandomize=True, deadline=None) + def test_manifold_curvature_bounded(self, probs): + """Manifold curvature must always be a non-negative finite float.""" + import math as m + + recovery = self._make_recovery() + causes = [{"probability": p} for p in probs] + curvature = recovery._calculate_manifold_curvature(causes) + assert m.isfinite(curvature), f"Non-finite curvature: {curvature}" + assert curvature >= 0 or len(probs) >= 2, f"Unexpected negative: {curvature}" + + @given( + exc_msg=st.text(min_size=0, max_size=200), + exc_type_name=st.sampled_from( + [ + "RuntimeError", + "ValueError", + "sqlite3.OperationalError", + "AttributeError", + "MemoryError", + "OSError", + ] + ), + ) + @settings( + suppress_health_check=[HealthCheck.too_slow], + deadline=None, + derandomize=True, + max_examples=50, + ) + def test_analyze_cause_never_crashes(self, exc_msg, exc_type_name): + """_analyze_cause must never raise for any error message/type combo.""" + recovery = self._make_recovery() + exc_type = type(exc_type_name, (Exception,), {}) + exc_type.__name__ = exc_type_name + exc_value = Exception(exc_msg) + diagnosis = {} + causes = recovery._analyze_cause(exc_type, exc_value, diagnosis) + assert isinstance(causes, list) + for c in causes: + assert isinstance(c["probability"], int) + assert 0 <= c["probability"] <= 100