Files
MeshChatX/tests/backend/test_database_snapshots.py
2026-03-05 16:18:29 -06:00

336 lines
11 KiB
Python

import json
import os
import shutil
import tempfile
import pytest
from meshchatx.src.backend.database import Database
@pytest.fixture
def temp_dir():
dir_path = tempfile.mkdtemp()
yield dir_path
shutil.rmtree(dir_path)
def test_database_snapshot_creation(temp_dir):
db_path = os.path.join(temp_dir, "test.db")
db = Database(db_path)
db.initialize()
# Add some data
db.execute_sql(
"INSERT INTO config (key, value) VALUES (?, ?)",
("test_key", "test_value"),
)
# Create snapshot
snapshot_name = "test_snapshot"
db.create_snapshot(temp_dir, snapshot_name)
snapshot_path = os.path.join(temp_dir, "snapshots", f"{snapshot_name}.zip")
assert os.path.exists(snapshot_path)
# List snapshots
snapshots = db.list_snapshots(temp_dir)
assert len(snapshots) == 1
assert snapshots[0]["name"] == snapshot_name
def test_database_snapshot_restoration(temp_dir):
db_path = os.path.join(temp_dir, "test.db")
db = Database(db_path)
db.initialize()
# Add some data
db.execute_sql("INSERT INTO config (key, value) VALUES (?, ?)", ("v1", "original"))
# Create snapshot
db.create_snapshot(temp_dir, "snap1")
snapshot_path = os.path.join(temp_dir, "snapshots", "snap1.zip")
# Modify data
db.execute_sql("UPDATE config SET value = ? WHERE key = ?", ("modified", "v1"))
row = db.provider.fetchone("SELECT value FROM config WHERE key = ?", ("v1",))
assert row["value"] == "modified"
# Restore snapshot
db.restore_database(snapshot_path)
# Verify data is back to original
row = db.provider.fetchone("SELECT value FROM config WHERE key = ?", ("v1",))
assert row is not None
assert row["value"] == "original"
def test_database_auto_backup_logic(temp_dir):
db_path = os.path.join(temp_dir, "test.db")
db = Database(db_path)
db.initialize()
# Should create a timestamped backup
result = db.backup_database(temp_dir)
assert "database-backups" in result["path"]
assert os.path.exists(result["path"])
backup_dir = os.path.join(temp_dir, "database-backups")
zips = [f for f in os.listdir(backup_dir) if f.endswith(".zip")]
assert len(zips) == 1
def test_backup_baseline_created_on_first_backup(temp_dir):
db_path = os.path.join(temp_dir, "test.db")
db = Database(db_path)
db.initialize()
db.backup_database(temp_dir)
baseline_path = os.path.join(temp_dir, "database-backups", "backup-baseline.json")
assert os.path.exists(baseline_path)
with open(baseline_path) as f:
data = json.load(f)
assert "message_count" in data
assert "total_bytes" in data
assert "timestamp" in data
def test_backup_suspicious_when_messages_gone_skips_cleanup_and_baseline(temp_dir):
db_path = os.path.join(temp_dir, "test.db")
db = Database(db_path)
db.initialize()
db.messages.upsert_lxmf_message(
{
"hash": "h1",
"source_hash": "s",
"destination_hash": "d",
"peer_hash": "p",
"state": "delivered",
"progress": 1.0,
"is_incoming": 1,
"method": "direct",
"delivery_attempts": 0,
"next_delivery_attempt_at": None,
"title": "t",
"content": "c",
"fields": "{}",
"timestamp": 0,
"rssi": None,
"snr": None,
"quality": None,
"is_spam": 0,
"reply_to_hash": None,
}
)
result1 = db.backup_database(temp_dir, max_count=3)
assert result1.get("suspicious") is not True
backup_dir = os.path.join(temp_dir, "database-backups")
zip_count_after_first = sum(1 for f in os.listdir(backup_dir) if f.endswith(".zip"))
assert zip_count_after_first == 1
with open(os.path.join(backup_dir, "backup-baseline.json")) as f:
baseline_after_first = json.load(f)
assert baseline_after_first["message_count"] == 1
db.messages.delete_all_lxmf_messages()
assert db.messages.count_lxmf_messages() == 0
result2 = db.backup_database(temp_dir, max_count=3)
assert result2.get("suspicious") is True
assert "baseline" in result2
assert result2["baseline"]["message_count"] == 1
assert result2["current_stats"]["message_count"] == 0
zip_count_after_suspicious = sum(
1 for f in os.listdir(backup_dir) if f.endswith(".zip")
)
assert zip_count_after_suspicious == 2
assert any("SUSPICIOUS" in f for f in os.listdir(backup_dir) if f.endswith(".zip"))
with open(os.path.join(backup_dir, "backup-baseline.json")) as f:
baseline_after_suspicious = json.load(f)
assert baseline_after_suspicious["message_count"] == 1
def test_backup_suspicious_when_size_collapsed_skips_cleanup(temp_dir):
db_path = os.path.join(temp_dir, "test.db")
db = Database(db_path)
db.initialize()
for i in range(200):
db.messages.upsert_lxmf_message(
{
"hash": f"h{i}",
"source_hash": "s",
"destination_hash": "d",
"peer_hash": "p",
"state": "delivered",
"progress": 1.0,
"is_incoming": 1,
"method": "direct",
"delivery_attempts": 0,
"next_delivery_attempt_at": None,
"title": "t",
"content": "x" * 500,
"fields": "{}",
"timestamp": float(i),
"rssi": None,
"snr": None,
"quality": None,
"is_spam": 0,
"reply_to_hash": None,
}
)
db.backup_database(temp_dir, max_count=3)
baseline_path = os.path.join(temp_dir, "database-backups", "backup-baseline.json")
with open(baseline_path) as f:
baseline = json.load(f)
assert baseline["total_bytes"] > 100_000
db.messages.delete_all_lxmf_messages()
db.execute_sql("VACUUM")
result = db.backup_database(temp_dir, max_count=3)
assert result.get("suspicious") is True
backup_dir = os.path.join(temp_dir, "database-backups")
zips = [f for f in os.listdir(backup_dir) if f.endswith(".zip")]
assert len(zips) >= 2
def test_backup_normal_rotation_and_baseline_update(temp_dir):
import time
db_path = os.path.join(temp_dir, "test.db")
db = Database(db_path)
db.initialize()
db.backup_database(temp_dir, max_count=2)
time.sleep(1.1)
db.backup_database(temp_dir, max_count=2)
time.sleep(1.1)
db.backup_database(temp_dir, max_count=2)
backup_dir = os.path.join(temp_dir, "database-backups")
zips = sorted([f for f in os.listdir(backup_dir) if f.endswith(".zip")])
assert len(zips) == 2
assert os.path.exists(os.path.join(backup_dir, "backup-baseline.json"))
def test_backup_failure_does_not_remove_existing_backups(temp_dir):
from unittest.mock import patch
db_path = os.path.join(temp_dir, "test.db")
db = Database(db_path)
db.initialize()
db.backup_database(temp_dir)
backup_dir = os.path.join(temp_dir, "database-backups")
existing = [f for f in os.listdir(backup_dir) if f.endswith(".zip")]
assert len(existing) == 1
with patch.object(db, "_backup_to_zip", side_effect=OSError("disk full")):
with pytest.raises(OSError):
db.backup_database(temp_dir, max_count=1)
still_there = [f for f in os.listdir(backup_dir) if f.endswith(".zip")]
assert len(still_there) == 1
def test_check_db_health_at_open_no_baseline_ok(temp_dir):
db_path = os.path.join(temp_dir, "test.db")
db = Database(db_path)
db.initialize()
issues = db.check_db_health_at_open(temp_dir)
assert issues == []
def test_check_db_health_at_open_baseline_suspicious_content(temp_dir):
db_path = os.path.join(temp_dir, "test.db")
db = Database(db_path)
db.initialize()
db.messages.upsert_lxmf_message(
{
"hash": "h1",
"source_hash": "s",
"destination_hash": "d",
"peer_hash": "p",
"state": "delivered",
"progress": 1.0,
"is_incoming": 1,
"method": "direct",
"delivery_attempts": 0,
"next_delivery_attempt_at": None,
"title": "t",
"content": "c",
"fields": "{}",
"timestamp": 0,
"rssi": None,
"snr": None,
"quality": None,
"is_spam": 0,
"reply_to_hash": None,
}
)
db.backup_database(temp_dir)
db.messages.delete_all_lxmf_messages()
issues = db.check_db_health_at_open(temp_dir)
assert len(issues) >= 1
assert any("anomaly" in i.lower() or "messages" in i for i in issues)
def test_check_db_health_at_open_integrity_fail(temp_dir):
from unittest.mock import patch
db_path = os.path.join(temp_dir, "test.db")
db = Database(db_path)
db.initialize()
with patch.object(db.provider, "integrity_check", return_value=[("corrupt",)]):
issues = db.check_db_health_at_open(temp_dir)
assert len(issues) >= 1
assert any("integrity" in i.lower() for i in issues)
def test_check_db_health_at_close_no_issues(temp_dir):
db_path = os.path.join(temp_dir, "test.db")
db = Database(db_path)
db.initialize()
issues = db.check_db_health_at_close(temp_dir)
assert issues == []
def test_check_db_health_at_close_integrity_fail(temp_dir):
from unittest.mock import patch
db_path = os.path.join(temp_dir, "test.db")
db = Database(db_path)
db.initialize()
with patch.object(db.provider, "integrity_check", return_value=[("corrupt",)]):
issues = db.check_db_health_at_close(temp_dir)
assert len(issues) >= 1
assert any("integrity" in i.lower() for i in issues)
def test_is_backup_suspicious_does_not_mistrigger_empty_baseline():
from meshchatx.src.backend.database import Database
db = Database(":memory:")
db.initialize()
assert (
db._is_backup_suspicious({"message_count": 0, "total_bytes": 0}, None) is False
)
assert (
db._is_backup_suspicious({"message_count": 10, "total_bytes": 1000}, None)
is False
)
def test_is_backup_suspicious_does_not_mistrigger_legitimate_empty():
from meshchatx.src.backend.database import Database
db = Database(":memory:")
db.initialize()
baseline = {"message_count": 0, "total_bytes": 5000}
assert (
db._is_backup_suspicious({"message_count": 0, "total_bytes": 5000}, baseline)
is False
)
def test_is_backup_suspicious_does_not_mistrigger_small_db():
from meshchatx.src.backend.database import Database
db = Database(":memory:")
db.initialize()
baseline = {"message_count": 5, "total_bytes": 50_000}
assert (
db._is_backup_suspicious({"message_count": 5, "total_bytes": 55_000}, baseline)
is False
)