refactor(tests): streamline test assertions and improve coverage for propagation nodes and licenses; add new tests for license rendering and propagation node state management

This commit is contained in:
Ivan
2026-04-15 03:10:20 -05:00
parent f8ce7563aa
commit fb775b74b3
6 changed files with 169 additions and 3 deletions

View File

@@ -47,9 +47,7 @@ class TestIntervalActionDue:
interval = 600
now = 500_000.0
last = int(now - 601)
assert (
interval_action_due(interval > 0, last, interval, now) is True
)
assert interval_action_due(interval > 0, last, interval, now) is True
def test_large_interval_last_at_slightly_in_future_ntp(self):
"""Small positive skew still triggers one correction announce."""

View File

@@ -67,6 +67,8 @@ async def test_auto_propagation_api(mock_rns_minimal, temp_dir):
data = json.loads(response.body)
assert "lxmf_preferred_propagation_node_auto_select" in data["config"]
assert data["config"]["lxmf_preferred_propagation_node_auto_select"] is False
assert "lxmf_propagation_transfer_limit_in_bytes" in data["config"]
assert "lxmf_propagation_sync_limit_in_bytes" in data["config"]
# 2. Test PATCH /api/v1/config updates auto_select
patch_handler = None
@@ -102,3 +104,20 @@ async def test_auto_propagation_api(mock_rns_minimal, temp_dir):
assert (
app_instance.config.lxmf_preferred_propagation_node_auto_select.get() is False
)
# Update transfer/sync limits and validate clamping/application
mock_request = MagicMock()
mock_request.json = MagicMock(return_value=asyncio.Future())
mock_request.json.return_value.set_result(
{
"lxmf_propagation_transfer_limit_in_bytes": 250_000,
"lxmf_propagation_sync_limit_in_bytes": 9_000_000,
},
)
response = await patch_handler(mock_request)
data = json.loads(response.body)
assert data["config"]["lxmf_propagation_transfer_limit_in_bytes"] == 250_000
assert data["config"]["lxmf_propagation_sync_limit_in_bytes"] == 9_000_000
assert app_instance.config.lxmf_propagation_transfer_limit_in_bytes.get() == 250_000
assert app_instance.config.lxmf_propagation_sync_limit_in_bytes.get() == 9_000_000

View File

@@ -1,8 +1,11 @@
from pathlib import Path
from unittest.mock import patch
from meshchatx.src.backend.licenses_collector import (
_flatten_pnpm_licenses_json,
build_licenses_payload,
render_third_party_notices,
write_embedded_license_artifacts,
)
@@ -58,3 +61,66 @@ def test_build_licenses_payload_composes_counts_and_meta():
assert payload["meta"]["frontend_count"] == 1
assert payload["meta"]["frontend_source"] == "pnpm"
assert payload["meta"]["generated_at"].endswith("Z")
def test_render_third_party_notices_contains_sections_and_rows():
payload = {
"backend": [
{"name": "rns", "version": "1.0", "author": "Author A", "license": "MIT"}
],
"frontend": [
{"name": "vue", "version": "3.0", "author": "Author B", "license": "MIT"}
],
"meta": {"generated_at": "2026-01-01T00:00:00Z", "frontend_source": "pnpm"},
}
rendered = render_third_party_notices(payload)
assert "Reticulum MeshChatX - Third-party notices" in rendered
assert "Python dependencies" in rendered
assert "Node dependencies" in rendered
assert "rns 1.0" in rendered
assert "vue 3.0" in rendered
def test_write_embedded_license_artifacts_writes_files(tmp_path):
payload = {
"backend": [{"name": "rns", "version": "1", "author": "a", "license": "MIT"}],
"frontend": [{"name": "vue", "version": "3", "author": "b", "license": "MIT"}],
"meta": {"generated_at": "2026-01-01T00:00:00Z", "frontend_source": "pnpm"},
}
with patch(
"meshchatx.src.backend.licenses_collector.build_licenses_payload",
return_value=payload,
):
result = write_embedded_license_artifacts(repo_root=tmp_path)
frontend_path = Path(result["frontend_path"])
notices_path = Path(result["notices_path"])
assert frontend_path.exists()
assert notices_path.exists()
assert '"name": "vue"' in frontend_path.read_text(encoding="utf-8")
assert "Reticulum MeshChatX - Third-party notices" in notices_path.read_text(
encoding="utf-8"
)
def test_write_embedded_license_artifacts_preserves_existing_frontend_when_empty(
tmp_path,
):
data_dir = tmp_path / "meshchatx" / "src" / "backend" / "data"
data_dir.mkdir(parents=True, exist_ok=True)
frontend_path = data_dir / "licenses_frontend.json"
frontend_path.write_text('[{"name":"kept"}]\n', encoding="utf-8")
payload = {
"backend": [],
"frontend": [],
"meta": {"generated_at": "2026-01-01T00:00:00Z", "frontend_source": "none"},
}
with patch(
"meshchatx.src.backend.licenses_collector.build_licenses_payload",
return_value=payload,
):
result = write_embedded_license_artifacts(repo_root=tmp_path)
assert result["frontend_written"] is False
assert frontend_path.read_text(encoding="utf-8") == '[{"name":"kept"}]\n'

View File

@@ -12,6 +12,7 @@ from meshchatx.meshchat import ReticulumMeshChat
# Store original constants
PR_IDLE = LXMF.LXMRouter.PR_IDLE
PR_PATH_REQUESTED = LXMF.LXMRouter.PR_PATH_REQUESTED
PR_PATH_TIMEOUT = LXMF.LXMRouter.PR_PATH_TIMEOUT
PR_RECEIVING = LXMF.LXMRouter.PR_RECEIVING
PR_COMPLETE = LXMF.LXMRouter.PR_COMPLETE
PR_FAILED = LXMF.LXMRouter.PR_FAILED
@@ -36,6 +37,7 @@ def mock_app(temp_dir):
# Set up constants on the mock class
mock_router_class.PR_IDLE = PR_IDLE
mock_router_class.PR_PATH_REQUESTED = PR_PATH_REQUESTED
mock_router_class.PR_PATH_TIMEOUT = PR_PATH_TIMEOUT
mock_router_class.PR_RECEIVING = PR_RECEIVING
mock_router_class.PR_COMPLETE = PR_COMPLETE
mock_router_class.PR_FAILED = PR_FAILED
@@ -43,6 +45,7 @@ def mock_app(temp_dir):
mock_router = mock_router_class.return_value
mock_router.PR_IDLE = PR_IDLE
mock_router.PR_PATH_REQUESTED = PR_PATH_REQUESTED
mock_router.PR_PATH_TIMEOUT = PR_PATH_TIMEOUT
mock_router.PR_RECEIVING = PR_RECEIVING
mock_router.PR_COMPLETE = PR_COMPLETE
mock_router.PR_FAILED = PR_FAILED
@@ -64,6 +67,7 @@ def mock_app(temp_dir):
) as mock_utils_router:
mock_utils_router.PR_IDLE = PR_IDLE
mock_utils_router.PR_PATH_REQUESTED = PR_PATH_REQUESTED
mock_utils_router.PR_PATH_TIMEOUT = PR_PATH_TIMEOUT
mock_utils_router.PR_RECEIVING = PR_RECEIVING
mock_utils_router.PR_COMPLETE = PR_COMPLETE
mock_utils_router.PR_FAILED = PR_FAILED
@@ -183,6 +187,7 @@ async def test_propagation_node_status_mapping(mock_app):
states_to_test = [
(PR_IDLE, "idle"),
(PR_PATH_REQUESTED, "path_requested"),
(PR_PATH_TIMEOUT, "path_timeout"),
(PR_RECEIVING, "receiving"),
(PR_COMPLETE, "complete"),
(PR_FAILED, "failed"),
@@ -193,6 +198,50 @@ async def test_propagation_node_status_mapping(mock_app):
response = await status_handler(None)
data = json.loads(response.body)
assert data["propagation_node_status"]["state"] == state_str
assert "local_propagation_node" in data
@pytest.mark.asyncio
async def test_local_propagation_node_stop_and_restart_routes(mock_app):
mock_router = mock_app.current_context.message_router
mock_router.propagation_node = True
mock_router.compile_stats.return_value = {
"uptime": 42,
"messagestore": {"count": 3, "bytes": 2048, "limit": 4096},
"clients": {
"client_propagation_messages_received": 7,
"client_propagation_messages_served": 5,
},
"delivery_limit": 10,
"propagation_limit": 20,
"sync_limit": 30,
"target_stamp_cost": 16,
"static_peers": 1,
"discovered_peers": 2,
"total_peers": 3,
"max_peers": 10,
}
stop_handler = next(
r.handler
for r in mock_app.get_routes()
if r.path == "/api/v1/lxmf/propagation-node/stop"
)
restart_handler = next(
r.handler
for r in mock_app.get_routes()
if r.path == "/api/v1/lxmf/propagation-node/restart"
)
stop_response = await stop_handler(None)
stop_data = json.loads(stop_response.body)
assert stop_data["message"] == "Local propagation node stopped"
mock_router.disable_propagation.assert_called()
restart_response = await restart_handler(None)
restart_data = json.loads(restart_response.body)
assert restart_data["message"] == "Local propagation node restarted"
mock_router.enable_propagation.assert_called()
@pytest.mark.asyncio
@@ -225,3 +274,30 @@ async def test_user_provided_node_hash(mock_app):
mock_app.current_context.message_router.request_messages_from_propagation_node.assert_called_with(
mock_app.current_context.identity,
)
def test_convert_propagation_node_state_maps_all_lxmf_transfer_states():
from meshchatx.src.backend.meshchat_utils import (
convert_propagation_node_state_to_string,
)
r = LXMF.LXMRouter
expected = {
r.PR_IDLE: "idle",
r.PR_PATH_REQUESTED: "path_requested",
r.PR_LINK_ESTABLISHING: "link_establishing",
r.PR_LINK_ESTABLISHED: "link_established",
r.PR_REQUEST_SENT: "request_sent",
r.PR_RECEIVING: "receiving",
r.PR_RESPONSE_RECEIVED: "response_received",
r.PR_COMPLETE: "complete",
r.PR_NO_PATH: "no_path",
r.PR_LINK_FAILED: "link_failed",
r.PR_TRANSFER_FAILED: "transfer_failed",
r.PR_NO_IDENTITY_RCVD: "no_identity_received",
r.PR_NO_ACCESS: "no_access",
r.PR_FAILED: "failed",
r.PR_PATH_TIMEOUT: "path_timeout",
}
for state_val, state_str in expected.items():
assert convert_propagation_node_state_to_string(state_val) == state_str

View File

@@ -61,6 +61,8 @@ async def test_propagation_nodes_endpoint_robustness(mock_rns_minimal, temp_dir)
assert response.status == 200
data = json.loads(response.body)
assert "lxmf_propagation_nodes" in data
for node in data["lxmf_propagation_nodes"]:
assert "is_local_node" in node
# Test with invalid limit (should not crash)
request.query = {"limit": "invalid"}
@@ -68,6 +70,8 @@ async def test_propagation_nodes_endpoint_robustness(mock_rns_minimal, temp_dir)
assert response.status == 200
data = json.loads(response.body)
assert "lxmf_propagation_nodes" in data
for node in data["lxmf_propagation_nodes"]:
assert "is_local_node" in node
# Test with missing limit (should not crash)
request.query = {}
@@ -75,3 +79,5 @@ async def test_propagation_nodes_endpoint_robustness(mock_rns_minimal, temp_dir)
assert response.status == 200
data = json.loads(response.body)
assert "lxmf_propagation_nodes" in data
for node in data["lxmf_propagation_nodes"]:
assert "is_local_node" in node

View File

@@ -716,6 +716,7 @@ def test_convert_propagation_node_state_to_string_robustness(state):
"no_identity_received",
"no_access",
"failed",
"path_timeout",
"unknown",
}
assert result in allowed