Add extensive tests for CrashRecovery and IntegrityManager

This commit is contained in:
Sudo-Ivan
2026-03-06 11:36:24 -06:00
parent 4fb50f412e
commit 2b6616f162
3 changed files with 544 additions and 0 deletions
+207
View File
@@ -321,6 +321,213 @@ class TestCrashRecovery(unittest.TestCase):
self.assertGreaterEqual(legacy_cause["probability"], 80)
self.assertIn("Kernel detected: 3.10", legacy_cause["reasoning"])
# ==================================================================
# install() / disable()
# ==================================================================
def test_install_sets_excepthook(self):
"""install() should set sys.excepthook to handle_exception."""
original = sys.excepthook
try:
self.recovery.install()
self.assertEqual(sys.excepthook, self.recovery.handle_exception)
finally:
sys.excepthook = original
def test_disable_prevents_install(self):
"""After disable(), install() should not set the hook."""
original = sys.excepthook
try:
self.recovery.disable()
self.recovery.install()
self.assertNotEqual(sys.excepthook, self.recovery.handle_exception)
finally:
sys.excepthook = original
# ==================================================================
# _calculate_manifold_curvature
# ==================================================================
def test_curvature_empty_causes(self):
"""Empty causes list should return 0.0 curvature."""
self.assertEqual(self.recovery._calculate_manifold_curvature([]), 0.0)
def test_curvature_single_cause(self):
"""Single cause should return probability * 10."""
causes = [{"probability": 95}]
self.assertAlmostEqual(self.recovery._calculate_manifold_curvature(causes), 9.5)
def test_curvature_two_causes_gradient(self):
"""Curvature should be 10 * (top - second)."""
causes = [{"probability": 90}, {"probability": 40}]
self.assertAlmostEqual(self.recovery._calculate_manifold_curvature(causes), 5.0)
def test_curvature_equal_causes(self):
"""Equal probabilities should give curvature = 0."""
causes = [{"probability": 50}, {"probability": 50}]
self.assertAlmostEqual(self.recovery._calculate_manifold_curvature(causes), 0.0)
def test_curvature_many_causes(self):
"""Only top 2 probabilities matter for curvature."""
causes = [
{"probability": 80},
{"probability": 30},
{"probability": 10},
]
self.assertAlmostEqual(self.recovery._calculate_manifold_curvature(causes), 5.0)
# ==================================================================
# _calculate_system_entropy edge cases
# ==================================================================
def test_entropy_empty_diagnosis(self):
"""Empty diagnosis should return baseline entropy (all ideal)."""
entropy, divergence = self.recovery._calculate_system_entropy({})
self.assertGreater(entropy, 0)
self.assertAlmostEqual(divergence, 0.0, places=5)
def test_entropy_all_critical(self):
"""All dimensions critical should maximize entropy and divergence."""
diag = {
"low_memory": True,
"config_missing": True,
"db_type": "memory",
}
entropy, divergence = self.recovery._calculate_system_entropy(diag)
base_entropy, base_div = self.recovery._calculate_system_entropy({})
self.assertGreater(entropy, base_entropy)
self.assertGreater(divergence, base_div)
def test_entropy_invalid_mem_value(self):
"""Non-numeric available_mem_mb should not crash."""
diag = {"available_mem_mb": "not_a_number", "low_memory": True}
entropy, divergence = self.recovery._calculate_system_entropy(diag)
self.assertIsInstance(entropy, float)
self.assertIsInstance(divergence, float)
def test_entropy_none_mem_value(self):
"""None available_mem_mb should not crash."""
diag = {"available_mem_mb": None}
entropy, divergence = self.recovery._calculate_system_entropy(diag)
self.assertIsInstance(entropy, float)
def test_divergence_nonnegative(self):
"""KL divergence must always be non-negative."""
test_cases = [
{},
{"low_memory": True},
{"config_missing": True},
{"db_type": "memory"},
{"low_memory": True, "config_missing": True, "db_type": "memory"},
]
for diag in test_cases:
_, div = self.recovery._calculate_system_entropy(diag)
self.assertGreaterEqual(div, 0.0, f"Negative divergence for {diag}")
# ==================================================================
# legacy_kernel regex safety
# ==================================================================
def test_legacy_kernel_non_linux(self):
"""Non-Linux platforms should not crash on legacy_kernel check."""
with (
patch("platform.system", return_value="Windows"),
patch("platform.release", return_value="10"),
):
exc_type = RuntimeError
exc_value = RuntimeError("test")
causes = self.recovery._analyze_cause(exc_type, exc_value, {})
self.assertIsInstance(causes, list)
def test_legacy_kernel_unusual_release(self):
"""Unusual release strings should not crash."""
with (
patch("platform.system", return_value="Linux"),
patch("platform.release", return_value="unknown"),
):
exc_type = RuntimeError
exc_value = RuntimeError("test")
causes = self.recovery._analyze_cause(exc_type, exc_value, {})
self.assertIsInstance(causes, list)
def test_legacy_kernel_empty_release(self):
"""Empty release string should not crash."""
with (
patch("platform.system", return_value="Linux"),
patch("platform.release", return_value=""),
):
exc_type = RuntimeError
exc_value = RuntimeError("test")
causes = self.recovery._analyze_cause(exc_type, exc_value, {})
self.assertIsInstance(causes, list)
# ==================================================================
# run_reticulum_diagnosis isolation
# ==================================================================
def test_reticulum_diagnosis_invalid_config_content(self):
"""Invalid config file content should be flagged."""
rns_dir = os.path.join(self.test_dir, "rns_invalid")
os.makedirs(rns_dir)
with open(os.path.join(rns_dir, "config"), "w") as f:
f.write("this is not a valid reticulum config")
self.recovery.update_paths(reticulum_config_dir=rns_dir)
output = io.StringIO()
results = self.recovery.run_reticulum_diagnosis(file=output)
report = output.getvalue()
self.assertIn("[ERROR]", report)
self.assertTrue(results.get("config_invalid", False))
def test_reticulum_diagnosis_empty_logfile(self):
"""Empty log file should be handled gracefully."""
rns_dir = os.path.join(self.test_dir, "rns_empty_log")
os.makedirs(rns_dir)
with open(os.path.join(rns_dir, "config"), "w") as f:
f.write("[reticulum]\n")
with open(os.path.join(rns_dir, "logfile"), "w") as f:
pass
self.recovery.update_paths(reticulum_config_dir=rns_dir)
output = io.StringIO()
self.recovery.run_reticulum_diagnosis(file=output)
report = output.getvalue()
self.assertIn("Log file is empty", report)
# ==================================================================
# Diagnosis edge cases
# ==================================================================
def test_diagnosis_empty_db_file(self):
"""0-byte database file should trigger a warning."""
open(self.db_path, "w").close() # noqa: SIM115
output = io.StringIO()
self.recovery.run_diagnosis(file=output)
report = output.getvalue()
self.assertIn("empty (0 bytes)", report)
def test_update_paths(self):
"""update_paths should correctly update internal state."""
new_storage = os.path.join(self.test_dir, "new_storage")
self.recovery.update_paths(storage_dir=new_storage)
self.assertEqual(self.recovery.storage_dir, new_storage)
def test_keyboard_interrupt_passthrough(self):
"""KeyboardInterrupt should be passed to the default hook."""
called = {}
def mock_hook(exc_type, exc_value, exc_tb):
called["invoked"] = True
original = sys.__excepthook__
sys.__excepthook__ = mock_hook
try:
self.recovery.handle_exception(KeyboardInterrupt, KeyboardInterrupt(), None)
self.assertTrue(called.get("invoked", False))
finally:
sys.__excepthook__ = original
if __name__ == "__main__":
unittest.main()
+239
View File
@@ -183,6 +183,245 @@ class TestIntegrityManagerExtensive(unittest.TestCase):
if not is_ok:
self.assertTrue(any("Database" in i for i in issues))
# ------------------------------------------------------------------
# Corrupt / malformed manifest
# ------------------------------------------------------------------
def test_corrupt_manifest_json(self):
"""check_integrity must not crash on invalid JSON in the manifest."""
self.manager.save_manifest()
with open(self.manager.manifest_path, "w") as f:
f.write("{{{NOT JSON!!!")
is_ok, issues = self.manager.check_integrity()
self.assertFalse(is_ok)
self.assertTrue(any("Integrity check failed" in i for i in issues))
def test_empty_manifest_file(self):
"""check_integrity must handle a 0-byte manifest gracefully."""
self.manager.save_manifest()
with open(self.manager.manifest_path, "w") as f:
f.truncate(0)
is_ok, issues = self.manager.check_integrity()
self.assertFalse(is_ok)
self.assertTrue(any("Integrity check failed" in i for i in issues))
def test_manifest_missing_keys(self):
"""Manifest with valid JSON but missing expected keys should not crash."""
with open(self.manager.manifest_path, "w") as f:
json.dump({"version": 2}, f)
is_ok, issues = self.manager.check_integrity()
self.assertTrue(is_ok or isinstance(issues, list))
# ------------------------------------------------------------------
# Hash consistency
# ------------------------------------------------------------------
def test_hash_file_consistency(self):
"""Same file content must always produce the same hash."""
h1 = self.manager._hash_file(self.db_path)
h2 = self.manager._hash_file(self.db_path)
self.assertEqual(h1, h2)
self.assertIsNotNone(h1)
self.assertEqual(len(h1), 64) # SHA-256 hex length
def test_hash_file_missing(self):
"""_hash_file on a missing path must return None."""
result = self.manager._hash_file(self.test_dir / "does_not_exist.bin")
self.assertIsNone(result)
# ------------------------------------------------------------------
# DB integrity check
# ------------------------------------------------------------------
def test_check_db_integrity_valid(self):
"""_check_db_integrity on a valid DB returns (True, 'ok')."""
ok, msg = self.manager._check_db_integrity(self.db_path)
self.assertTrue(ok)
self.assertEqual(msg, "ok")
def test_check_db_integrity_not_sqlite(self):
"""_check_db_integrity on a non-SQLite file returns (False, ...)."""
bad = self.test_dir / "bad.db"
bad.write_text("this is not sqlite")
ok, msg = self.manager._check_db_integrity(bad)
self.assertFalse(ok)
def test_check_db_integrity_missing(self):
"""_check_db_integrity on missing file returns (False, ...)."""
ok, msg = self.manager._check_db_integrity(self.test_dir / "gone.db")
self.assertFalse(ok)
self.assertIn("does not exist", msg)
def test_check_db_integrity_empty_file(self):
"""_check_db_integrity on a 0-byte file should not crash."""
empty_db = self.test_dir / "empty.db"
empty_db.touch()
ok, msg = self.manager._check_db_integrity(empty_db)
# SQLite treats a 0-byte file as a valid empty database
self.assertIsInstance(ok, bool)
self.assertIsInstance(msg, str)
# ------------------------------------------------------------------
# Entropy threshold boundaries
# ------------------------------------------------------------------
def test_entropy_threshold_db_just_below(self):
"""DB entropy delta of 0.99 should NOT trigger anomaly warning."""
self.manager.save_manifest()
with open(self.manager.manifest_path) as f:
manifest = json.load(f)
db_rel = str(self.db_path.relative_to(self.test_dir))
real_entropy = manifest["metadata"][db_rel]["entropy"]
manifest["metadata"][db_rel]["entropy"] = real_entropy + 0.99
with open(self.manager.manifest_path, "w") as f:
json.dump(manifest, f)
is_ok, issues = self.manager.check_integrity()
self.assertFalse(
any("structural anomaly" in i for i in issues),
f"Should not flag anomaly at delta=0.99: {issues}",
)
def test_entropy_threshold_db_just_above(self):
"""DB entropy delta of 1.01 should trigger anomaly warning."""
self.manager.save_manifest()
with open(self.manager.manifest_path) as f:
manifest = json.load(f)
db_rel = str(self.db_path.relative_to(self.test_dir))
real_entropy = manifest["metadata"][db_rel]["entropy"]
manifest["metadata"][db_rel]["entropy"] = real_entropy + 1.01
# Also change the file hash so it triggers the comparison path
manifest["files"][db_rel] = "0" * 64
with open(self.manager.manifest_path, "w") as f:
json.dump(manifest, f)
is_ok, issues = self.manager.check_integrity()
self.assertFalse(is_ok)
self.assertTrue(
any("structural anomaly" in i or "Entropy" in i for i in issues),
f"Should flag anomaly at delta=1.01: {issues}",
)
def test_entropy_threshold_file_1_49_no_flag(self):
"""Non-DB file entropy delta of 1.49 should NOT trigger content shift."""
data_file = self.test_dir / "payload.bin"
data_file.write_bytes(b"X" * 2000)
self.manager.save_manifest()
with open(self.manager.manifest_path) as f:
manifest = json.load(f)
rel = str(data_file.relative_to(self.test_dir))
real_entropy = manifest["metadata"][rel]["entropy"]
manifest["metadata"][rel]["entropy"] = real_entropy + 1.49
manifest["files"][rel] = "0" * 64
with open(self.manager.manifest_path, "w") as f:
json.dump(manifest, f)
is_ok, issues = self.manager.check_integrity()
self.assertFalse(
any("Non-linear content shift" in i for i in issues),
f"Should not flag content shift at delta=1.49: {issues}",
)
def test_entropy_threshold_file_1_51_flags(self):
"""Non-DB file entropy delta of 1.51 should trigger content shift."""
data_file = self.test_dir / "payload.bin"
data_file.write_bytes(b"X" * 2000)
self.manager.save_manifest()
with open(self.manager.manifest_path) as f:
manifest = json.load(f)
rel = str(data_file.relative_to(self.test_dir))
real_entropy = manifest["metadata"][rel]["entropy"]
manifest["metadata"][rel]["entropy"] = real_entropy + 1.51
manifest["files"][rel] = "0" * 64
with open(self.manager.manifest_path, "w") as f:
json.dump(manifest, f)
is_ok, issues = self.manager.check_integrity()
self.assertFalse(is_ok)
self.assertTrue(
any("Non-linear content shift" in i for i in issues),
f"Should flag content shift at delta=1.51: {issues}",
)
# ------------------------------------------------------------------
# DB outside storage_dir
# ------------------------------------------------------------------
def test_db_outside_storage_dir(self):
"""check_integrity must not crash when DB is outside storage_dir."""
import tempfile as tf
ext_dir = Path(tf.mkdtemp())
try:
ext_db = ext_dir / "external.db"
conn = sqlite3.connect(ext_db)
conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY)")
conn.close()
mgr = IntegrityManager(self.test_dir, ext_db)
mgr.save_manifest()
is_ok, issues = mgr.check_integrity()
self.assertTrue(is_ok or isinstance(issues, list))
finally:
shutil.rmtree(ext_dir)
# ------------------------------------------------------------------
# Hypothesis: entropy for any binary data
# ------------------------------------------------------------------
@settings(
suppress_health_check=[HealthCheck.too_slow],
deadline=None,
derandomize=True,
)
@given(st.binary(min_size=1, max_size=4096))
def test_entropy_monotonic_with_unique_bytes(self, data):
"""More unique byte values should produce higher entropy."""
f = self.test_dir / "hyp_ent"
f.write_bytes(data)
e = self.manager._calculate_entropy(f)
unique = len(set(data))
if unique == 1:
self.assertAlmostEqual(e, 0.0, places=5)
else:
self.assertGreater(e, 0.0)
self.assertLessEqual(e, 8.0 + 1e-9)
# ------------------------------------------------------------------
# Hypothesis: save_manifest then check_integrity always consistent
# ------------------------------------------------------------------
@settings(
suppress_health_check=[HealthCheck.too_slow],
deadline=None,
max_examples=10,
derandomize=True,
)
@given(st.binary(min_size=1, max_size=512))
def test_save_then_check_always_passes(self, extra_data):
"""After save_manifest(), check_integrity() must pass for unchanged state."""
extra_file = self.test_dir / "extra.bin"
extra_file.write_bytes(extra_data)
self.manager.save_manifest()
is_ok, issues = self.manager.check_integrity()
self.assertTrue(is_ok, f"Should pass after save. Issues: {issues}")
if __name__ == "__main__":
unittest.main()
+98
View File
@@ -947,3 +947,101 @@ def test_is_backup_suspicious_property(prev_count, prev_bytes, curr_count, curr_
result = db._is_backup_suspicious(current_stats, baseline)
expected = _is_backup_suspicious_reference(current_stats, baseline)
assert result == expected
# =====================================================================
# CrashRecovery math property-based tests
# =====================================================================
class TestCrashRecoveryMathProperties:
"""Property-based tests for CrashRecovery mathematical functions."""
@staticmethod
def _make_recovery():
return CrashRecovery()
@given(
low_memory=st.booleans(),
config_missing=st.booleans(),
config_invalid=st.booleans(),
db_type=st.sampled_from(["file", "memory"]),
available_mem_mb=st.one_of(
st.floats(min_value=0, max_value=100000, allow_nan=False),
st.integers(min_value=0, max_value=100000),
st.none(),
st.just("garbage"),
),
)
@settings(
suppress_health_check=[HealthCheck.too_slow],
deadline=None,
derandomize=True,
)
def test_system_entropy_always_finite(
self, low_memory, config_missing, config_invalid, db_type, available_mem_mb
):
"""Entropy and divergence must always be finite floats for any diagnosis."""
import math as m
recovery = self._make_recovery()
diag = {
"low_memory": low_memory,
"config_missing": config_missing,
"config_invalid": config_invalid,
"db_type": db_type,
"available_mem_mb": available_mem_mb,
}
entropy, divergence = recovery._calculate_system_entropy(diag)
assert m.isfinite(entropy), f"Non-finite entropy: {entropy}"
assert m.isfinite(divergence), f"Non-finite divergence: {divergence}"
assert entropy >= 0, f"Negative entropy: {entropy}"
assert divergence >= 0, f"Negative divergence: {divergence}"
@given(
probs=st.lists(
st.integers(min_value=0, max_value=100), min_size=0, max_size=10
),
)
@settings(derandomize=True, deadline=None)
def test_manifold_curvature_bounded(self, probs):
"""Manifold curvature must always be a non-negative finite float."""
import math as m
recovery = self._make_recovery()
causes = [{"probability": p} for p in probs]
curvature = recovery._calculate_manifold_curvature(causes)
assert m.isfinite(curvature), f"Non-finite curvature: {curvature}"
assert curvature >= 0 or len(probs) >= 2, f"Unexpected negative: {curvature}"
@given(
exc_msg=st.text(min_size=0, max_size=200),
exc_type_name=st.sampled_from(
[
"RuntimeError",
"ValueError",
"sqlite3.OperationalError",
"AttributeError",
"MemoryError",
"OSError",
]
),
)
@settings(
suppress_health_check=[HealthCheck.too_slow],
deadline=None,
derandomize=True,
max_examples=50,
)
def test_analyze_cause_never_crashes(self, exc_msg, exc_type_name):
"""_analyze_cause must never raise for any error message/type combo."""
recovery = self._make_recovery()
exc_type = type(exc_type_name, (Exception,), {})
exc_type.__name__ = exc_type_name
exc_value = Exception(exc_msg)
diagnosis = {}
causes = recovery._analyze_cause(exc_type, exc_value, diagnosis)
assert isinstance(causes, list)
for c in causes:
assert isinstance(c["probability"], int)
assert 0 <= c["probability"] <= 100