import shutil import tempfile import unittest import os import sqlite3 import json from pathlib import Path from hypothesis import given, strategies as st, settings, HealthCheck from meshchatx.src.backend.integrity_manager import IntegrityManager class TestIntegrityManagerExtensive(unittest.TestCase): def setUp(self): self.test_dir = Path(tempfile.mkdtemp()) self.db_path = self.test_dir / "database.db" self.storage_dir = self.test_dir / "storage" self.storage_dir.mkdir() # Create a valid SQLite database conn = sqlite3.connect(self.db_path) conn.execute("CREATE TABLE data (id INTEGER PRIMARY KEY, val TEXT)") conn.execute("INSERT INTO data (val) VALUES ('initial')") conn.commit() conn.close() self.manager = IntegrityManager(self.test_dir, self.db_path) def tearDown(self): shutil.rmtree(self.test_dir) def test_entropy_mathematical_bounds(self): """Verify entropy stays within [0, 8] for any byte sequence.""" # Test empty empty_file = self.test_dir / "empty" empty_file.touch() self.assertEqual(self.manager._calculate_entropy(empty_file), 0) # Test single byte repeated (minimum entropy) zero_entropy_file = self.test_dir / "zero_entropy" with open(zero_entropy_file, "wb") as f: f.write(b"AAAAAAAA" * 100) self.assertEqual(self.manager._calculate_entropy(zero_entropy_file), 0) # Test all 256 bytes (maximum entropy) max_entropy_file = self.test_dir / "max_entropy" with open(max_entropy_file, "wb") as f: f.write(bytes(range(256))) # log2(256) = 8 self.assertAlmostEqual( self.manager._calculate_entropy(max_entropy_file), 8.0, places=5 ) @settings(suppress_health_check=[HealthCheck.too_slow], deadline=None) @given(st.binary(min_size=1, max_size=1024)) def test_entropy_property(self, data): """Property: Entropy is always between 0 and 8 for non-empty data.""" temp_file = self.test_dir / "prop_test" with open(temp_file, "wb") as f: f.write(data) entropy = self.manager._calculate_entropy(temp_file) self.assertGreaterEqual(entropy, 0) self.assertLessEqual(entropy, 8.000000000000002) # Float precision def test_db_structural_tamper_detection(self): """Simulate actual SQLite corruption that bypasses hash-only checks.""" self.manager.save_manifest() # Corrupt the database file header or internal structure # Overwriting the first few bytes (SQLite header) is a guaranteed fail with open(self.db_path, "r+b") as f: f.seek(0) f.write(b"NOTASQLITEFILE") is_ok, issues = self.manager.check_integrity() self.assertFalse(is_ok, f"Integrity should fail. Issues: {issues}") self.assertTrue( any( "Database structural issue" in i or "Database structural anomaly" in i for i in issues ), f"Expected structural issue in: {issues}", ) def test_entropy_shift_detection(self): """Test detection of content type change (e.g. replacing text with random bytes).""" # 1. Start with a highly structured file (low entropy) # Use a non-critical filename to trigger the entropy check branch data_file = self.test_dir / "user_data.bin" with open(data_file, "wb") as f: f.write(b"A" * 5000) self.manager.save_manifest() # 2. Replace with high-entropy data (random bytes) with open(data_file, "wb") as f: f.write(os.urandom(5000)) is_ok, issues = self.manager.check_integrity() self.assertFalse(is_ok, f"Integrity should fail. Issues: {issues}") self.assertTrue( any("Non-linear content shift" in i or "Entropy Δ" in i for i in issues), f"Expected entropy shift in: {issues}", ) def test_ignore_patterns_extensive(self): """Verify all volatile LXMF/RNS patterns are correctly filtered.""" volatile_files = [ "lxmf_router/lxmf/outbound_stamp_costs", "lxmf_router/storage/some_volatile_file", "lxmf_router/announces/ann_data", "lxmf_router/tmp/uploading", "database.db-wal", "database.db-shm", "something.tmp", ".DS_Store", ] for v in volatile_files: rel_path = Path(v) full_path = self.test_dir / rel_path full_path.parent.mkdir(parents=True, exist_ok=True) full_path.touch() self.assertTrue( self.manager._should_ignore(str(rel_path)), f"Failed to ignore {v}" ) def test_critical_file_protection(self): """Ensure identity and config changes are always treated as critical.""" id_file = self.test_dir / "identity" id_file.write_text("secure_key") self.manager.save_manifest() # Minor modification (stays low entropy) id_file.write_text("secure_kez") is_ok, issues = self.manager.check_integrity() self.assertFalse(is_ok) self.assertTrue(any("Critical security component" in i for i in issues)) def test_missing_file_detection(self): """Verify missing files are detected even if not critical.""" misc_file = self.test_dir / "misc.txt" misc_file.write_text("data") self.manager.save_manifest() misc_file.unlink() is_ok, issues = self.manager.check_integrity() self.assertFalse(is_ok) self.assertTrue(any("File missing: misc.txt" in i for i in issues)) def test_manifest_versioning(self): """Verify the manifest includes the new version and metadata fields.""" self.manager.save_manifest() with open(self.manager.manifest_path) as f: manifest = json.load(f) self.assertEqual(manifest["version"], 2) self.assertIn("metadata", manifest) # Check if database metadata exists db_rel = str(self.db_path.relative_to(self.test_dir)) self.assertIn(db_rel, manifest["metadata"]) self.assertIn("entropy", manifest["metadata"][db_rel]) self.assertIn("size", manifest["metadata"][db_rel]) def test_database_size_divergence(self): """Verify size changes are caught when hash changes but entropy is similar.""" self.manager.save_manifest() # Grow the database with similar content conn = sqlite3.connect(self.db_path) conn.execute("INSERT INTO data (val) VALUES (?)", ("more content" * 100,)) conn.commit() conn.close() is_ok, issues = self.manager.check_integrity() # Even if entropy shift is low, hash and size changed if not is_ok: self.assertTrue(any("Database" in i for i in issues)) # ------------------------------------------------------------------ # Corrupt / malformed manifest # ------------------------------------------------------------------ def test_corrupt_manifest_json(self): """check_integrity must not crash on invalid JSON in the manifest.""" self.manager.save_manifest() with open(self.manager.manifest_path, "w") as f: f.write("{{{NOT JSON!!!") is_ok, issues = self.manager.check_integrity() self.assertFalse(is_ok) self.assertTrue(any("Integrity check failed" in i for i in issues)) def test_empty_manifest_file(self): """check_integrity must handle a 0-byte manifest gracefully.""" self.manager.save_manifest() with open(self.manager.manifest_path, "w") as f: f.truncate(0) is_ok, issues = self.manager.check_integrity() self.assertFalse(is_ok) self.assertTrue(any("Integrity check failed" in i for i in issues)) def test_manifest_missing_keys(self): """Manifest with valid JSON but missing expected keys should not crash.""" with open(self.manager.manifest_path, "w") as f: json.dump({"version": 2}, f) is_ok, issues = self.manager.check_integrity() self.assertTrue(is_ok or isinstance(issues, list)) # ------------------------------------------------------------------ # Hash consistency # ------------------------------------------------------------------ def test_hash_file_consistency(self): """Same file content must always produce the same hash.""" h1 = self.manager._hash_file(self.db_path) h2 = self.manager._hash_file(self.db_path) self.assertEqual(h1, h2) self.assertIsNotNone(h1) self.assertEqual(len(h1), 64) # SHA-256 hex length def test_hash_file_missing(self): """_hash_file on a missing path must return None.""" result = self.manager._hash_file(self.test_dir / "does_not_exist.bin") self.assertIsNone(result) # ------------------------------------------------------------------ # DB integrity check # ------------------------------------------------------------------ def test_check_db_integrity_valid(self): """_check_db_integrity on a valid DB returns (True, 'ok').""" ok, msg = self.manager._check_db_integrity(self.db_path) self.assertTrue(ok) self.assertEqual(msg, "ok") def test_check_db_integrity_not_sqlite(self): """_check_db_integrity on a non-SQLite file returns (False, ...).""" bad = self.test_dir / "bad.db" bad.write_text("this is not sqlite") ok, msg = self.manager._check_db_integrity(bad) self.assertFalse(ok) def test_check_db_integrity_missing(self): """_check_db_integrity on missing file returns (False, ...).""" ok, msg = self.manager._check_db_integrity(self.test_dir / "gone.db") self.assertFalse(ok) self.assertIn("does not exist", msg) def test_check_db_integrity_empty_file(self): """_check_db_integrity on a 0-byte file should not crash.""" empty_db = self.test_dir / "empty.db" empty_db.touch() ok, msg = self.manager._check_db_integrity(empty_db) # SQLite treats a 0-byte file as a valid empty database self.assertIsInstance(ok, bool) self.assertIsInstance(msg, str) # ------------------------------------------------------------------ # Entropy threshold boundaries # ------------------------------------------------------------------ def test_entropy_threshold_db_just_below(self): """DB entropy delta of 0.99 should NOT trigger anomaly warning.""" self.manager.save_manifest() with open(self.manager.manifest_path) as f: manifest = json.load(f) db_rel = str(self.db_path.relative_to(self.test_dir)) real_entropy = manifest["metadata"][db_rel]["entropy"] manifest["metadata"][db_rel]["entropy"] = real_entropy + 0.99 with open(self.manager.manifest_path, "w") as f: json.dump(manifest, f) is_ok, issues = self.manager.check_integrity() self.assertFalse( any("structural anomaly" in i for i in issues), f"Should not flag anomaly at delta=0.99: {issues}", ) def test_entropy_threshold_db_just_above(self): """DB entropy delta of 1.01 should trigger anomaly warning.""" self.manager.save_manifest() with open(self.manager.manifest_path) as f: manifest = json.load(f) db_rel = str(self.db_path.relative_to(self.test_dir)) real_entropy = manifest["metadata"][db_rel]["entropy"] manifest["metadata"][db_rel]["entropy"] = real_entropy + 1.01 # Also change the file hash so it triggers the comparison path manifest["files"][db_rel] = "0" * 64 with open(self.manager.manifest_path, "w") as f: json.dump(manifest, f) is_ok, issues = self.manager.check_integrity() self.assertFalse(is_ok) self.assertTrue( any("structural anomaly" in i or "Entropy" in i for i in issues), f"Should flag anomaly at delta=1.01: {issues}", ) def test_entropy_threshold_file_1_49_no_flag(self): """Non-DB file entropy delta of 1.49 should NOT trigger content shift.""" data_file = self.test_dir / "payload.bin" data_file.write_bytes(b"X" * 2000) self.manager.save_manifest() with open(self.manager.manifest_path) as f: manifest = json.load(f) rel = str(data_file.relative_to(self.test_dir)) real_entropy = manifest["metadata"][rel]["entropy"] manifest["metadata"][rel]["entropy"] = real_entropy + 1.49 manifest["files"][rel] = "0" * 64 with open(self.manager.manifest_path, "w") as f: json.dump(manifest, f) is_ok, issues = self.manager.check_integrity() self.assertFalse( any("Non-linear content shift" in i for i in issues), f"Should not flag content shift at delta=1.49: {issues}", ) def test_entropy_threshold_file_1_51_flags(self): """Non-DB file entropy delta of 1.51 should trigger content shift.""" data_file = self.test_dir / "payload.bin" data_file.write_bytes(b"X" * 2000) self.manager.save_manifest() with open(self.manager.manifest_path) as f: manifest = json.load(f) rel = str(data_file.relative_to(self.test_dir)) real_entropy = manifest["metadata"][rel]["entropy"] manifest["metadata"][rel]["entropy"] = real_entropy + 1.51 manifest["files"][rel] = "0" * 64 with open(self.manager.manifest_path, "w") as f: json.dump(manifest, f) is_ok, issues = self.manager.check_integrity() self.assertFalse(is_ok) self.assertTrue( any("Non-linear content shift" in i for i in issues), f"Should flag content shift at delta=1.51: {issues}", ) # ------------------------------------------------------------------ # DB outside storage_dir # ------------------------------------------------------------------ def test_db_outside_storage_dir(self): """check_integrity must not crash when DB is outside storage_dir.""" import tempfile as tf ext_dir = Path(tf.mkdtemp()) try: ext_db = ext_dir / "external.db" conn = sqlite3.connect(ext_db) conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY)") conn.close() mgr = IntegrityManager(self.test_dir, ext_db) mgr.save_manifest() is_ok, issues = mgr.check_integrity() self.assertTrue(is_ok or isinstance(issues, list)) finally: shutil.rmtree(ext_dir) # ------------------------------------------------------------------ # Hypothesis: entropy for any binary data # ------------------------------------------------------------------ @settings( suppress_health_check=[HealthCheck.too_slow], deadline=None, derandomize=True, ) @given(st.binary(min_size=1, max_size=4096)) def test_entropy_monotonic_with_unique_bytes(self, data): """More unique byte values should produce higher entropy.""" f = self.test_dir / "hyp_ent" f.write_bytes(data) e = self.manager._calculate_entropy(f) unique = len(set(data)) if unique == 1: self.assertAlmostEqual(e, 0.0, places=5) else: self.assertGreater(e, 0.0) self.assertLessEqual(e, 8.0 + 1e-9) # ------------------------------------------------------------------ # Hypothesis: save_manifest then check_integrity always consistent # ------------------------------------------------------------------ @settings( suppress_health_check=[HealthCheck.too_slow], deadline=None, max_examples=10, derandomize=True, ) @given(st.binary(min_size=1, max_size=512)) def test_save_then_check_always_passes(self, extra_data): """After save_manifest(), check_integrity() must pass for unchanged state.""" extra_file = self.test_dir / "extra.bin" extra_file.write_bytes(extra_data) self.manager.save_manifest() is_ok, issues = self.manager.check_integrity() self.assertTrue(is_ok, f"Should pass after save. Issues: {issues}") if __name__ == "__main__": unittest.main()