From ca70cce7449d78c7abb8eca1ddbb20a16e797ca6 Mon Sep 17 00:00:00 2001 From: Ivan Date: Mon, 13 Apr 2026 14:32:24 -0500 Subject: [PATCH] feat(auto_propagation): improve auto propagation node selection with path waiting and probing logic --- .../src/backend/auto_propagation_manager.py | 181 +++++++++++------- tests/backend/test_auto_propagation.py | 78 ++++---- 2 files changed, 158 insertions(+), 101 deletions(-) diff --git a/meshchatx/src/backend/auto_propagation_manager.py b/meshchatx/src/backend/auto_propagation_manager.py index e552759..3ed2ab2 100644 --- a/meshchatx/src/backend/auto_propagation_manager.py +++ b/meshchatx/src/backend/auto_propagation_manager.py @@ -2,9 +2,25 @@ import asyncio import time import RNS +from LXMF.LXMRouter import LXMRouter from meshchatx.src.backend.meshchat_utils import parse_lxmf_propagation_node_app_data +_PROP_FAILURE_STATES = frozenset( + { + LXMRouter.PR_NO_PATH, + LXMRouter.PR_LINK_FAILED, + LXMRouter.PR_TRANSFER_FAILED, + LXMRouter.PR_NO_IDENTITY_RCVD, + LXMRouter.PR_NO_ACCESS, + LXMRouter.PR_FAILED, + }, +) + +PATH_WAIT_SECONDS = 40.0 +SYNC_PROBE_TIMEOUT_SECONDS = 120.0 +POLL_INTERVAL_SECONDS = 0.2 + class AutoPropagationManager: def __init__(self, app, context): @@ -37,88 +53,119 @@ class AutoPropagationManager: await asyncio.sleep(self._check_interval) + async def _wait_for_path(self, dest_hash: bytes, timeout: float) -> bool: + if RNS.Transport.has_path(dest_hash): + return True + RNS.Transport.request_path(dest_hash) + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + if RNS.Transport.has_path(dest_hash): + return True + await asyncio.sleep(POLL_INTERVAL_SECONDS) + return RNS.Transport.has_path(dest_hash) + + async def _probe_propagation_sync(self, node_hex: str) -> bool: + ctx = self.context + router = ctx.message_router + try: + dest = bytes.fromhex(node_hex) + if len(dest) != RNS.Identity.TRUNCATED_HASHLENGTH // 8: + return False + except Exception: + return False + + self.app.stop_propagation_node_sync(context=ctx) + try: + router.set_outbound_propagation_node(dest) + except Exception: + return False + + router.request_messages_from_propagation_node(ctx.identity) + + deadline = time.monotonic() + SYNC_PROBE_TIMEOUT_SECONDS + seen_progress = False + + while time.monotonic() < deadline: + state = router.propagation_transfer_state + if state in _PROP_FAILURE_STATES: + self.app.stop_propagation_node_sync(context=ctx) + return False + if state != LXMRouter.PR_IDLE: + seen_progress = True + elif seen_progress: + self.app.stop_propagation_node_sync(context=ctx) + return True + await asyncio.sleep(POLL_INTERVAL_SECONDS) + + self.app.stop_propagation_node_sync(context=ctx) + return False + async def check_and_update_propagation_node(self): - # Get all propagation node announces + ctx = self.context + router = ctx.message_router + + if router.propagation_transfer_state != LXMRouter.PR_IDLE: + return + announces = self.database.announces.get_announces(aspect="lxmf.propagation") - nodes_with_hops = [] + best_by_hex: dict[str, tuple[int, str]] = {} for announce in announces: - dest_hash_hex = announce["destination_hash"] - dest_hash = bytes.fromhex(dest_hash_hex) - - # Check if propagation is enabled for this node + dest_hex = announce["destination_hash"] node_data = parse_lxmf_propagation_node_app_data(announce["app_data"]) if not node_data or not node_data.get("enabled", False): continue - + try: + dest_hash = bytes.fromhex(dest_hex) + except Exception: + continue if RNS.Transport.has_path(dest_hash): hops = RNS.Transport.hops_to(dest_hash) - nodes_with_hops.append((hops, dest_hash_hex)) + else: + hops = 10**9 + prev = best_by_hex.get(dest_hex) + if prev is None or hops < prev[0]: + best_by_hex[dest_hex] = (hops, dest_hex) - # Sort by hops (lowest first) - nodes_with_hops.sort() - - current_node = ( - self.config.lxmf_preferred_propagation_node_destination_hash.get() - ) - - if not nodes_with_hops: + sorted_candidates = sorted(best_by_hex.values(), key=lambda x: x[0]) + if not sorted_candidates: return - # Try nodes in order of hops until we find a reachable one - for hops, node_hex in nodes_with_hops: - # If current node is already the best and we have it, check if we should keep it - if node_hex == current_node: - # We could probe it to be sure, but for now let's assume it's fine if it's the best - return + previous_hex = self.config.lxmf_preferred_propagation_node_destination_hash.get() + ordered: list[tuple[int, str]] = [] + seen_hex: set[str] = set() + if previous_hex and previous_hex in best_by_hex: + ordered.append(best_by_hex[previous_hex]) + seen_hex.add(previous_hex) + for item in sorted_candidates: + if item[1] not in seen_hex: + ordered.append(item) + seen_hex.add(item[1]) - # Before switching to a new "best" node, try to probe it to ensure it's actually reachable + for _hops, node_hex in ordered: try: dest_hash = bytes.fromhex(node_hex) - # We use a short timeout for the probe - if await self.probe_node(dest_hash): - print( - f"Auto-propagation: Switching to better node {node_hex} ({hops} hops) for {self.context.identity_hash}", - ) - self.app.set_active_propagation_node(node_hex, context=self.context) - self.config.lxmf_preferred_propagation_node_destination_hash.set( - node_hex, - ) - return + except Exception: + continue + + if not await self._wait_for_path(dest_hash, PATH_WAIT_SECONDS): + continue + + if not await self._probe_propagation_sync(node_hex): + continue + + if node_hex != previous_hex: print( - f"Auto-propagation: Node {node_hex} announced but probe failed, trying next...", + f"Auto-propagation: Switching to verified node {node_hex} " + f"for {self.context.identity_hash}", ) - except Exception as e: - print(f"Auto-propagation: Error probing node {node_hex}: {e}") + self.app.set_active_propagation_node(node_hex, context=self.context) + self.config.lxmf_preferred_propagation_node_destination_hash.set( + node_hex, + ) + return - async def probe_node(self, destination_hash): - """Probes a destination to see if it's reachable.""" - try: - # We use the app's probe handler if available - if ( - hasattr(self.context, "rnprobe_handler") - and self.context.rnprobe_handler - ): - # Re-using the logic from RNProbeHandler but simplified - if not RNS.Transport.has_path(destination_hash): - RNS.Transport.request_path(destination_hash) - - # Wait a bit for path - timeout = 5 - start = time.time() - while ( - not RNS.Transport.has_path(destination_hash) - and time.time() - start < timeout - ): - await asyncio.sleep(0.5) - - if not RNS.Transport.has_path(destination_hash): - return False - - # If we have a path, it's a good sign. - # For propagation nodes, having a path is often enough to try using it. - return True - - return RNS.Transport.has_path(destination_hash) - except Exception: - return False + if previous_hex: + self.app.set_active_propagation_node(previous_hex, context=self.context) + else: + self.app.remove_active_propagation_node(context=self.context) diff --git a/tests/backend/test_auto_propagation.py b/tests/backend/test_auto_propagation.py index 035c254..f3ff204 100644 --- a/tests/backend/test_auto_propagation.py +++ b/tests/backend/test_auto_propagation.py @@ -2,13 +2,19 @@ from unittest.mock import MagicMock, patch import pytest import RNS +from LXMF.LXMRouter import LXMRouter from meshchatx.src.backend.auto_propagation_manager import AutoPropagationManager +_VALID_HASH_A = "01" * 16 +_VALID_HASH_B = "02" * 16 +_VALID_HASH_C = "03" * 16 + +_APP_DATA_ENABLED = b"\x94\x00\x00\x01\x00" + @pytest.mark.asyncio async def test_auto_propagation_logic(): - # Mock dependencies app = MagicMock() context = MagicMock() config = MagicMock() @@ -18,88 +24,92 @@ async def test_auto_propagation_logic(): context.database = database context.identity_hash = "test_identity" context.running = True + context.message_router = MagicMock() + context.message_router.propagation_transfer_state = LXMRouter.PR_IDLE manager = AutoPropagationManager(app, context) - # 1. Test disabled state config.lxmf_preferred_propagation_node_auto_select.get.return_value = False with patch.object(manager, "check_and_update_propagation_node") as mock_check: - # Run one iteration manually if config.lxmf_preferred_propagation_node_auto_select.get(): await manager.check_and_update_propagation_node() mock_check.assert_not_called() - # 2. Test selection logic config.lxmf_preferred_propagation_node_auto_select.get.return_value = True config.lxmf_preferred_propagation_node_destination_hash.get.return_value = None - # Mock announces announce1 = { - "destination_hash": "aaaa1111", - "app_data": b"\x94\x00\x00\x01\x00", # msgpack for [0, 0, 1, 0] -> enabled=True + "destination_hash": _VALID_HASH_A, + "app_data": _APP_DATA_ENABLED, + } + announce2 = { + "destination_hash": _VALID_HASH_B, + "app_data": _APP_DATA_ENABLED, } - announce2 = {"destination_hash": "bbbb2222", "app_data": b"\x94\x00\x00\x01\x00"} database.announces.get_announces.return_value = [announce1, announce2] - # Mock RNS Transport with ( patch.object(RNS.Transport, "has_path", return_value=True), patch.object(RNS.Transport, "hops_to") as mock_hops, - patch.object(manager, "probe_node", return_value=True), + patch.object(manager, "_wait_for_path", return_value=True), + patch.object(manager, "_probe_propagation_sync", return_value=True), ): - # announce1 is closer (1 hop) - # announce2 is further (3 hops) - mock_hops.side_effect = lambda dh: 1 if dh == bytes.fromhex("aaaa1111") else 3 + mock_hops.side_effect = ( + lambda dh: 1 if dh == bytes.fromhex(_VALID_HASH_A) else 3 + ) await manager.check_and_update_propagation_node() - # Should have selected aaaa1111 - app.set_active_propagation_node.assert_called_with("aaaa1111", context=context) + app.set_active_propagation_node.assert_called_with( + _VALID_HASH_A, + context=context, + ) config.lxmf_preferred_propagation_node_destination_hash.set.assert_called_with( - "aaaa1111", + _VALID_HASH_A, ) - # 3. Test switching to better node config.lxmf_preferred_propagation_node_destination_hash.get.return_value = ( - "bbbb2222" + _VALID_HASH_B ) app.set_active_propagation_node.reset_mock() with ( patch.object(RNS.Transport, "has_path", return_value=True), patch.object(RNS.Transport, "hops_to") as mock_hops, - patch.object(manager, "probe_node", return_value=True), + patch.object(manager, "_wait_for_path", return_value=True), + patch.object(manager, "_probe_propagation_sync", side_effect=[False, True]), ): - mock_hops.side_effect = lambda dh: 1 if dh == bytes.fromhex("aaaa1111") else 3 + mock_hops.side_effect = ( + lambda dh: 1 if dh == bytes.fromhex(_VALID_HASH_A) else 3 + ) await manager.check_and_update_propagation_node() - # Should have switched to aaaa1111 because it's closer - app.set_active_propagation_node.assert_called_with("aaaa1111", context=context) + app.set_active_propagation_node.assert_called_with( + _VALID_HASH_A, + context=context, + ) - # 4. Test failover when probe fails config.lxmf_preferred_propagation_node_destination_hash.get.return_value = ( - "cccc3333" + _VALID_HASH_C ) - announce3 = {"destination_hash": "cccc3333", "app_data": b"\x94\x00\x00\x01\x00"} + announce3 = { + "destination_hash": _VALID_HASH_C, + "app_data": _APP_DATA_ENABLED, + } database.announces.get_announces.return_value = [announce1, announce3] app.set_active_propagation_node.reset_mock() with ( patch.object(RNS.Transport, "has_path", return_value=True), patch.object(RNS.Transport, "hops_to") as mock_hops, - patch.object(manager, "probe_node") as mock_probe, + patch.object(manager, "_wait_for_path", return_value=True), + patch.object(manager, "_probe_propagation_sync", return_value=True), ): - # announce1 is 1 hop, but probe fails - # announce3 is 2 hops, probe succeeds - mock_hops.side_effect = lambda dh: 1 if dh == bytes.fromhex("aaaa1111") else 2 - mock_probe.side_effect = lambda dh: ( - False if dh == bytes.fromhex("aaaa1111") else True + mock_hops.side_effect = ( + lambda dh: 1 if dh == bytes.fromhex(_VALID_HASH_A) else 2 ) await manager.check_and_update_propagation_node() - # Should NOT switch to aaaa1111 because probe failed - # Should STAY on cccc3333 or switch to it if it was different - # Since it's already on cccc3333 and it's the best reachable, no switch app.set_active_propagation_node.assert_not_called()