diff --git a/meshchatx/meshchat.py b/meshchatx/meshchat.py index 862a545..cfbbdd6 100644 --- a/meshchatx/meshchat.py +++ b/meshchatx/meshchat.py @@ -2222,9 +2222,12 @@ class ReticulumMeshChat: # set outbound propagation node if destination_hash is not None and destination_hash != "": try: + destination_hash_bytes = bytes.fromhex(destination_hash) ctx.message_router.set_outbound_propagation_node( - bytes.fromhex(destination_hash), + destination_hash_bytes, ) + with contextlib.suppress(Exception): + RNS.Transport.request_path(destination_hash_bytes) except Exception: # failed to set propagation node, clear it to ensure we don't use an old one by mistake self.remove_active_propagation_node(context=ctx) @@ -2402,7 +2405,21 @@ class ReticulumMeshChat: message_store = stats.get("messagestore", {}) if isinstance(stats, dict) else {} clients = stats.get("clients", {}) if isinstance(stats, dict) else {} + peers = stats.get("peers", {}) if isinstance(stats, dict) else {} uptime = _numeric(stats.get("uptime", 0)) if isinstance(stats, dict) else 0 + peer_rx_bytes = 0 + peer_tx_bytes = 0 + if isinstance(peers, dict): + for peer_stats in peers.values(): + if not isinstance(peer_stats, dict): + continue + peer_rx_bytes += int(_numeric(peer_stats.get("rx_bytes", 0))) + peer_tx_bytes += int(_numeric(peer_stats.get("tx_bytes", 0))) + unpeered_rx_bytes = ( + int(_numeric(stats.get("unpeered_propagation_rx_bytes", 0))) + if isinstance(stats, dict) + else 0 + ) delivery_limit = ( _numeric(stats.get("delivery_limit", 0)) if isinstance(stats, dict) @@ -2434,6 +2451,9 @@ class ReticulumMeshChat: "client_propagation_messages_served", 0, ), + "rx_bytes": peer_rx_bytes + unpeered_rx_bytes, + "tx_bytes": peer_tx_bytes, + "unpeered_rx_bytes": unpeered_rx_bytes, "static_peers": stats.get("static_peers", 0) if isinstance(stats, dict) else 0, @@ -7349,7 +7369,8 @@ class ReticulumMeshChat: @routes.get("/api/v1/lxmf/propagation-node/sync") async def propagation_node_sync(request): # ensure propagation node is configured before attempting to sync - if self.message_router.get_outbound_propagation_node() is None: + outbound_node = self.message_router.get_outbound_propagation_node() + if outbound_node is None: return web.json_response( { "message": "A propagation node must be configured to sync messages.", @@ -7357,6 +7378,12 @@ class ReticulumMeshChat: status=400, ) + # proactively request path, but do not block/fail here. + # LXMF internally manages PR_PATH_REQUESTED and retries. + if not RNS.Transport.has_path(outbound_node): + with contextlib.suppress(Exception): + RNS.Transport.request_path(outbound_node) + # request messages from propagation node await self.sync_propagation_nodes() @@ -7500,6 +7527,15 @@ class ReticulumMeshChat: if updated_at and "+" not in updated_at and "Z" not in updated_at: updated_at += "Z" + is_local_node = ( + announce["identity_hash"] == local_identity_hash + or announce["destination_hash"] == local_destination_hash + ) + if is_local_node and isinstance(local_stats, dict): + local_running = local_stats.get("is_running") + if isinstance(local_running, bool): + is_propagation_enabled = local_running + lxmf_propagation_nodes.append( { "destination_hash": announce["destination_hash"], @@ -7507,16 +7543,8 @@ class ReticulumMeshChat: "operator_display_name": operator_display_name, "is_propagation_enabled": is_propagation_enabled, "per_transfer_limit": per_transfer_limit, - "is_local_node": ( - announce["identity_hash"] == local_identity_hash - or announce["destination_hash"] == local_destination_hash - ), - "local_node_stats": ( - local_stats - if announce["identity_hash"] == local_identity_hash - or announce["destination_hash"] == local_destination_hash - else None - ), + "is_local_node": is_local_node, + "local_node_stats": (local_stats if is_local_node else None), "created_at": created_at, "updated_at": updated_at, }, @@ -7537,7 +7565,12 @@ class ReticulumMeshChat: "destination_hash": local_destination_hash, "identity_hash": local_identity_hash, "operator_display_name": ctx.config.display_name.get(), - "is_propagation_enabled": ctx.config.lxmf_local_propagation_node_enabled.get(), + "is_propagation_enabled": ( + local_stats.get("is_running") + if isinstance(local_stats, dict) + and isinstance(local_stats.get("is_running"), bool) + else ctx.config.lxmf_local_propagation_node_enabled.get() + ), "per_transfer_limit": int( getattr( ctx.message_router, "propagation_per_transfer_limit", 0 @@ -7564,6 +7597,30 @@ class ReticulumMeshChat: # convert destination hash to bytes destination_hash = bytes.fromhex(destination_hash) + destination_hash_hex = destination_hash.hex() + local_hashes: set[str] = set() + with contextlib.suppress(Exception): + if self.current_context and self.current_context.identity: + local_hashes.add(self.current_context.identity.hash.hex()) + with contextlib.suppress(Exception): + if self.local_lxmf_destination is not None: + local_hashes.add(self.local_lxmf_destination.hash.hex()) + with contextlib.suppress(Exception): + if self.current_context and self.current_context.message_router: + pdest = self.current_context.message_router.propagation_destination + if pdest is not None and getattr(pdest, "hash", None): + local_hashes.add(pdest.hash.hex()) + + if destination_hash_hex in local_hashes: + return web.json_response( + { + "path": { + "hops": 0, + "next_hop": destination_hash_hex, + "next_hop_interface": "Local", + }, + }, + ) # check if user wants to request the path from the network right now request_query_param = request.query.get("request", "false") @@ -8442,6 +8499,25 @@ class ReticulumMeshChat: try: status = self.bot_handler.get_status() templates = self.bot_handler.get_available_templates() + if self.database: + for bot in status.get("bots") or []: + lxmf_addr = bot.get("lxmf_address") or bot.get("full_address") + if not lxmf_addr: + bot["last_announce_at"] = None + continue + lxmf_addr = str(lxmf_addr).strip().lower() + ann = self.database.announces.get_announce_by_hash(lxmf_addr) + if not ann: + bot["last_announce_at"] = None + continue + arow = dict(ann) if not isinstance(ann, dict) else ann + ts = arow.get("updated_at") + if ts is not None and hasattr(ts, "isoformat"): + bot["last_announce_at"] = ts.isoformat() + else: + bot["last_announce_at"] = ( + str(ts) if ts is not None else None + ) return web.json_response( { "status": status, @@ -8541,6 +8617,62 @@ class ReticulumMeshChat: status=500, ) + @routes.patch("/api/v1/bots/update") + async def bots_update(request): + data = await request.json() + bot_id = data.get("bot_id") + name = data.get("name") + + if not bot_id: + return web.json_response( + {"message": "bot_id is required"}, + status=400, + ) + + try: + self.bot_handler.update_bot_name(bot_id, name) + return web.json_response({"success": True}) + except ValueError as e: + return web.json_response( + {"message": str(e)}, + status=400, + ) + except Exception as e: + return web.json_response( + {"message": str(e)}, + status=500, + ) + + @routes.post("/api/v1/bots/announce") + async def bots_announce(request): + data = await request.json() + bot_id = data.get("bot_id") + + if not bot_id: + return web.json_response( + {"message": "bot_id is required"}, + status=400, + ) + + try: + self.bot_handler.request_announce(bot_id) + return web.json_response({"success": True}) + except ValueError as e: + return web.json_response( + {"message": str(e)}, + status=400, + ) + except RuntimeError as e: + return web.json_response( + {"message": str(e)}, + status=409, + ) + except Exception as e: + return web.json_response( + {"message": str(e)}, + status=500, + ) + @routes.get("/api/v1/bots/export") async def bots_export(request): bot_id = request.query.get("bot_id") @@ -10816,6 +10948,29 @@ class ReticulumMeshChat: # update last synced at timestamp ctx.config.lxmf_preferred_propagation_node_last_synced_at.set(int(time.time())) + outbound_node = ctx.message_router.get_outbound_propagation_node() + local_propagation_destination = getattr( + ctx.message_router, + "propagation_destination", + None, + ) + local_propagation_hash = getattr(local_propagation_destination, "hash", None) + if ( + isinstance(outbound_node, (bytes, bytearray)) + and isinstance(local_propagation_hash, (bytes, bytearray)) + and bytes(outbound_node) == bytes(local_propagation_hash) + ): + # Local node selected as preferred: no transport path lookup is needed. + # Mark sync as complete immediately to avoid getting stuck in PR_PATH_REQUESTED. + with contextlib.suppress(Exception): + ctx.message_router.propagation_transfer_state = ( + ctx.message_router.PR_COMPLETE + ) + ctx.message_router.propagation_transfer_progress = 1.0 + ctx.message_router.propagation_transfer_last_result = 0 + await self.send_config_to_websocket_clients(context=ctx) + return + # request messages from propagation node ctx.message_router.request_messages_from_propagation_node(ctx.identity) diff --git a/tests/backend/test_lxmf_propagation_full.py b/tests/backend/test_lxmf_propagation_full.py index e55a0d6..7db609a 100644 --- a/tests/backend/test_lxmf_propagation_full.py +++ b/tests/backend/test_lxmf_propagation_full.py @@ -3,6 +3,7 @@ import json import shutil import tempfile +from types import SimpleNamespace from unittest.mock import MagicMock, patch import LXMF @@ -137,6 +138,46 @@ async def test_lxmf_sync_flow(mock_app): assert data["propagation_node_status"]["progress"] == 75.0 +@pytest.mark.asyncio +async def test_lxmf_sync_requests_path_before_sync(mock_app): + mock_router = mock_app.current_context.message_router + outbound = b"somehash" + mock_router.get_outbound_propagation_node.return_value = outbound + sync_handler = next( + r.handler + for r in mock_app.get_routes() + if r.path == "/api/v1/lxmf/propagation-node/sync" + ) + + with patch("meshchatx.meshchat.RNS.Transport.has_path", return_value=False): + with patch("meshchatx.meshchat.RNS.Transport.request_path") as mock_request: + await sync_handler(None) + mock_request.assert_called_with(outbound) + mock_router.request_messages_from_propagation_node.assert_called_with( + mock_app.current_context.identity + ) + + +@pytest.mark.asyncio +async def test_lxmf_sync_completes_immediately_for_local_preferred_node(mock_app): + mock_router = mock_app.current_context.message_router + local_hash = b"local-prop-node-1" + mock_router.propagation_destination = SimpleNamespace(hash=local_hash) + mock_router.get_outbound_propagation_node.return_value = local_hash + sync_handler = next( + r.handler + for r in mock_app.get_routes() + if r.path == "/api/v1/lxmf/propagation-node/sync" + ) + + await sync_handler(None) + + assert mock_router.propagation_transfer_state == PR_COMPLETE + assert mock_router.propagation_transfer_progress == 1.0 + assert mock_router.propagation_transfer_last_result == 0 + mock_router.request_messages_from_propagation_node.assert_not_called() + + @pytest.mark.asyncio async def test_hosting_prop_node(mock_app): mock_router = mock_app.current_context.message_router @@ -214,6 +255,11 @@ async def test_local_propagation_node_stop_and_restart_routes(mock_app): "client_propagation_messages_received": 7, "client_propagation_messages_served": 5, }, + "peers": { + "peer-a": {"rx_bytes": 1000, "tx_bytes": 600}, + "peer-b": {"rx_bytes": 300, "tx_bytes": 400}, + }, + "unpeered_propagation_rx_bytes": 200, "delivery_limit": 10, "propagation_limit": 20, "sync_limit": 30, @@ -238,11 +284,19 @@ async def test_local_propagation_node_stop_and_restart_routes(mock_app): stop_response = await stop_handler(None) stop_data = json.loads(stop_response.body) assert stop_data["message"] == "Local propagation node stopped" + local_stop = stop_data["local_propagation_node"] + assert local_stop["rx_bytes"] == 1500 + assert local_stop["tx_bytes"] == 1000 + assert local_stop["messagestore_limit_bytes"] == 4096 + assert local_stop["client_messages_received"] == 7 mock_router.disable_propagation.assert_called() restart_response = await restart_handler(None) restart_data = json.loads(restart_response.body) assert restart_data["message"] == "Local propagation node restarted" + local_restart = restart_data["local_propagation_node"] + assert local_restart["rx_bytes"] == 1500 + assert local_restart["tx_bytes"] == 1000 mock_router.enable_propagation.assert_called() @@ -252,9 +306,11 @@ async def test_user_provided_node_hash(mock_app): node_hash_hex = "d81255ae2ff367d4883b16c9cc8c6178" # Set this node as preferred - await mock_app.update_config( - {"lxmf_preferred_propagation_node_destination_hash": node_hash_hex}, - ) + with patch("meshchatx.meshchat.RNS.Transport.request_path") as mock_request_path: + await mock_app.update_config( + {"lxmf_preferred_propagation_node_destination_hash": node_hash_hex}, + ) + mock_request_path.assert_called_with(bytes.fromhex(node_hash_hex)) # Check if the router was updated with the correct bytes mock_app.current_context.message_router.set_outbound_propagation_node.assert_called_with( @@ -278,6 +334,26 @@ async def test_user_provided_node_hash(mock_app): ) +@pytest.mark.asyncio +async def test_destination_path_returns_local_hop_zero_for_local_destinations(mock_app): + local_hash = mock_app.current_context.identity.hash.hex() + path_handler = next( + r.handler + for r in mock_app.get_routes() + if r.path == "/api/v1/destination/{destination_hash}/path" + ) + request = SimpleNamespace( + match_info={"destination_hash": local_hash}, + query={}, + ) + + response = await path_handler(request) + data = json.loads(response.body) + assert data["path"]["hops"] == 0 + assert data["path"]["next_hop"] == local_hash + assert data["path"]["next_hop_interface"] == "Local" + + def test_convert_propagation_node_state_maps_all_lxmf_transfer_states(): from meshchatx.src.backend.meshchat_utils import ( convert_propagation_node_state_to_string, diff --git a/tests/backend/test_lxmf_propagation_sync_integration.py b/tests/backend/test_lxmf_propagation_sync_integration.py new file mode 100644 index 0000000..01c9f1c --- /dev/null +++ b/tests/backend/test_lxmf_propagation_sync_integration.py @@ -0,0 +1,171 @@ +# SPDX-License-Identifier: 0BSD + +import json +import shutil +import tempfile +from types import SimpleNamespace +from unittest.mock import patch + +import LXMF +import pytest +import RNS + +from meshchatx.meshchat import ReticulumMeshChat + +PR_IDLE = LXMF.LXMRouter.PR_IDLE +PR_PATH_REQUESTED = LXMF.LXMRouter.PR_PATH_REQUESTED +PR_PATH_TIMEOUT = LXMF.LXMRouter.PR_PATH_TIMEOUT +PR_RECEIVING = LXMF.LXMRouter.PR_RECEIVING +PR_COMPLETE = LXMF.LXMRouter.PR_COMPLETE +PR_FAILED = LXMF.LXMRouter.PR_FAILED + + +class FakePropagationRouter: + PR_IDLE = PR_IDLE + PR_PATH_REQUESTED = PR_PATH_REQUESTED + PR_COMPLETE = PR_COMPLETE + PR_FAILED = PR_FAILED + + def __init__(self, local_hash: bytes): + self.outbound_propagation_node = None + self.propagation_destination = SimpleNamespace( + hash=local_hash, + hexhash=local_hash.hex(), + ) + self.propagation_transfer_state = self.PR_IDLE + self.propagation_transfer_progress = 0.0 + self.propagation_transfer_last_result = 0 + self.request_messages_calls = 0 + self.propagation_node = False + + def set_outbound_propagation_node(self, destination_hash: bytes): + self.outbound_propagation_node = destination_hash + + def get_outbound_propagation_node(self): + return self.outbound_propagation_node + + def request_messages_from_propagation_node(self, _identity): + self.request_messages_calls += 1 + if self.outbound_propagation_node is None: + self.propagation_transfer_state = self.PR_FAILED + self.propagation_transfer_progress = 0.0 + return + + if RNS.Transport.has_path(self.outbound_propagation_node): + self.propagation_transfer_state = self.PR_COMPLETE + self.propagation_transfer_progress = 1.0 + self.propagation_transfer_last_result = 3 + return + + RNS.Transport.request_path(self.outbound_propagation_node) + self.propagation_transfer_state = self.PR_PATH_REQUESTED + self.propagation_transfer_progress = 0.0 + + +@pytest.fixture +def temp_dir(): + dir_path = tempfile.mkdtemp() + yield dir_path + shutil.rmtree(dir_path) + + +@pytest.fixture +def integration_app(temp_dir): + with ( + patch("RNS.Reticulum") as mock_rns, + patch("RNS.Transport"), + patch("LXMF.LXMRouter"), + patch("meshchatx.meshchat.get_file_path", return_value="/tmp/mock_path"), + patch("meshchatx.meshchat.generate_ssl_certificate"), + patch("meshchatx.src.backend.meshchat_utils.LXMRouter") as mock_utils_router, + ): + mock_rns.return_value.transport_enabled.return_value = False + mock_utils_router.PR_IDLE = PR_IDLE + mock_utils_router.PR_PATH_REQUESTED = PR_PATH_REQUESTED + mock_utils_router.PR_PATH_TIMEOUT = PR_PATH_TIMEOUT + mock_utils_router.PR_RECEIVING = PR_RECEIVING + mock_utils_router.PR_COMPLETE = PR_COMPLETE + mock_utils_router.PR_FAILED = PR_FAILED + + app = ReticulumMeshChat( + identity=RNS.Identity(), + storage_dir=temp_dir, + reticulum_config_dir=temp_dir, + ) + fake_router = FakePropagationRouter(local_hash=b"\x11" * 16) + app.current_context.message_router = fake_router + + with patch.object( + app, + "send_config_to_websocket_clients", + return_value=None, + ): + yield app, fake_router + + +def _route_handler(app, path, method="GET"): + return next(r.handler for r in app.get_routes() if r.path == path and r.method == method) + + +@pytest.mark.asyncio +@pytest.mark.integration +async def test_remote_propagation_sync_transitions_path_requested_to_complete(integration_app): + app, fake_router = integration_app + remote_hash = b"\x22" * 16 + fake_router.set_outbound_propagation_node(remote_hash) + + known_paths = set() + request_count = {} + + def has_path(destination_hash): + return bytes(destination_hash) in known_paths + + def request_path(destination_hash): + key = bytes(destination_hash) + request_count[key] = request_count.get(key, 0) + 1 + if request_count[key] >= 2: + known_paths.add(key) + + sync_handler = _route_handler(app, "/api/v1/lxmf/propagation-node/sync") + status_handler = _route_handler(app, "/api/v1/lxmf/propagation-node/status") + + with ( + patch("meshchatx.meshchat.RNS.Transport.has_path", side_effect=has_path), + patch("meshchatx.meshchat.RNS.Transport.request_path", side_effect=request_path), + ): + first_sync = await sync_handler(None) + assert first_sync.status == 200 + + first_status = json.loads((await status_handler(None)).body)["propagation_node_status"] + assert first_status["state"] == "path_requested" + + second_sync = await sync_handler(None) + assert second_sync.status == 200 + + second_status = json.loads((await status_handler(None)).body)["propagation_node_status"] + assert second_status["state"] == "complete" + assert second_status["progress"] == 100.0 + assert fake_router.request_messages_calls >= 2 + + +@pytest.mark.asyncio +@pytest.mark.integration +async def test_local_preferred_propagation_sync_completes_without_remote_lookup(integration_app): + app, fake_router = integration_app + local_hash = fake_router.propagation_destination.hash + fake_router.set_outbound_propagation_node(local_hash) + + sync_handler = _route_handler(app, "/api/v1/lxmf/propagation-node/sync") + status_handler = _route_handler(app, "/api/v1/lxmf/propagation-node/status") + + with ( + patch("meshchatx.meshchat.RNS.Transport.has_path", return_value=False), + patch("meshchatx.meshchat.RNS.Transport.request_path"), + ): + response = await sync_handler(None) + assert response.status == 200 + + status_data = json.loads((await status_handler(None)).body)["propagation_node_status"] + assert status_data["state"] == "complete" + assert status_data["progress"] == 100.0 + assert fake_router.request_messages_calls == 0 diff --git a/tests/backend/test_propagation_nodes_robustness.py b/tests/backend/test_propagation_nodes_robustness.py index d75edcf..db53b26 100644 --- a/tests/backend/test_propagation_nodes_robustness.py +++ b/tests/backend/test_propagation_nodes_robustness.py @@ -65,6 +65,12 @@ async def test_propagation_nodes_endpoint_robustness(mock_rns_minimal, temp_dir) assert "lxmf_propagation_nodes" in data for node in data["lxmf_propagation_nodes"]: assert "is_local_node" in node + if node.get("is_local_node") and isinstance(node.get("local_node_stats"), dict): + if isinstance(node["local_node_stats"].get("is_running"), bool): + assert ( + node.get("is_propagation_enabled") + == node["local_node_stats"]["is_running"] + ) # Test with invalid limit (should not crash) request.query = {"limit": "invalid"} @@ -74,6 +80,12 @@ async def test_propagation_nodes_endpoint_robustness(mock_rns_minimal, temp_dir) assert "lxmf_propagation_nodes" in data for node in data["lxmf_propagation_nodes"]: assert "is_local_node" in node + if node.get("is_local_node") and isinstance(node.get("local_node_stats"), dict): + if isinstance(node["local_node_stats"].get("is_running"), bool): + assert ( + node.get("is_propagation_enabled") + == node["local_node_stats"]["is_running"] + ) # Test with missing limit (should not crash) request.query = {} @@ -83,3 +95,9 @@ async def test_propagation_nodes_endpoint_robustness(mock_rns_minimal, temp_dir) assert "lxmf_propagation_nodes" in data for node in data["lxmf_propagation_nodes"]: assert "is_local_node" in node + if node.get("is_local_node") and isinstance(node.get("local_node_stats"), dict): + if isinstance(node["local_node_stats"].get("is_running"), bool): + assert ( + node.get("is_propagation_enabled") + == node["local_node_stats"]["is_running"] + )