feat(auto_propagation): improve auto propagation node selection with path waiting and probing logic

This commit is contained in:
Ivan
2026-04-13 14:32:24 -05:00
parent 62e4ec0001
commit ca70cce744
2 changed files with 158 additions and 101 deletions
+114 -67
View File
@@ -2,9 +2,25 @@ import asyncio
import time
import RNS
from LXMF.LXMRouter import LXMRouter
from meshchatx.src.backend.meshchat_utils import parse_lxmf_propagation_node_app_data
_PROP_FAILURE_STATES = frozenset(
{
LXMRouter.PR_NO_PATH,
LXMRouter.PR_LINK_FAILED,
LXMRouter.PR_TRANSFER_FAILED,
LXMRouter.PR_NO_IDENTITY_RCVD,
LXMRouter.PR_NO_ACCESS,
LXMRouter.PR_FAILED,
},
)
PATH_WAIT_SECONDS = 40.0
SYNC_PROBE_TIMEOUT_SECONDS = 120.0
POLL_INTERVAL_SECONDS = 0.2
class AutoPropagationManager:
def __init__(self, app, context):
@@ -37,88 +53,119 @@ class AutoPropagationManager:
await asyncio.sleep(self._check_interval)
async def _wait_for_path(self, dest_hash: bytes, timeout: float) -> bool:
if RNS.Transport.has_path(dest_hash):
return True
RNS.Transport.request_path(dest_hash)
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
if RNS.Transport.has_path(dest_hash):
return True
await asyncio.sleep(POLL_INTERVAL_SECONDS)
return RNS.Transport.has_path(dest_hash)
async def _probe_propagation_sync(self, node_hex: str) -> bool:
ctx = self.context
router = ctx.message_router
try:
dest = bytes.fromhex(node_hex)
if len(dest) != RNS.Identity.TRUNCATED_HASHLENGTH // 8:
return False
except Exception:
return False
self.app.stop_propagation_node_sync(context=ctx)
try:
router.set_outbound_propagation_node(dest)
except Exception:
return False
router.request_messages_from_propagation_node(ctx.identity)
deadline = time.monotonic() + SYNC_PROBE_TIMEOUT_SECONDS
seen_progress = False
while time.monotonic() < deadline:
state = router.propagation_transfer_state
if state in _PROP_FAILURE_STATES:
self.app.stop_propagation_node_sync(context=ctx)
return False
if state != LXMRouter.PR_IDLE:
seen_progress = True
elif seen_progress:
self.app.stop_propagation_node_sync(context=ctx)
return True
await asyncio.sleep(POLL_INTERVAL_SECONDS)
self.app.stop_propagation_node_sync(context=ctx)
return False
async def check_and_update_propagation_node(self):
# Get all propagation node announces
ctx = self.context
router = ctx.message_router
if router.propagation_transfer_state != LXMRouter.PR_IDLE:
return
announces = self.database.announces.get_announces(aspect="lxmf.propagation")
nodes_with_hops = []
best_by_hex: dict[str, tuple[int, str]] = {}
for announce in announces:
dest_hash_hex = announce["destination_hash"]
dest_hash = bytes.fromhex(dest_hash_hex)
# Check if propagation is enabled for this node
dest_hex = announce["destination_hash"]
node_data = parse_lxmf_propagation_node_app_data(announce["app_data"])
if not node_data or not node_data.get("enabled", False):
continue
try:
dest_hash = bytes.fromhex(dest_hex)
except Exception:
continue
if RNS.Transport.has_path(dest_hash):
hops = RNS.Transport.hops_to(dest_hash)
nodes_with_hops.append((hops, dest_hash_hex))
else:
hops = 10**9
prev = best_by_hex.get(dest_hex)
if prev is None or hops < prev[0]:
best_by_hex[dest_hex] = (hops, dest_hex)
# Sort by hops (lowest first)
nodes_with_hops.sort()
current_node = (
self.config.lxmf_preferred_propagation_node_destination_hash.get()
)
if not nodes_with_hops:
sorted_candidates = sorted(best_by_hex.values(), key=lambda x: x[0])
if not sorted_candidates:
return
# Try nodes in order of hops until we find a reachable one
for hops, node_hex in nodes_with_hops:
# If current node is already the best and we have it, check if we should keep it
if node_hex == current_node:
# We could probe it to be sure, but for now let's assume it's fine if it's the best
return
previous_hex = self.config.lxmf_preferred_propagation_node_destination_hash.get()
ordered: list[tuple[int, str]] = []
seen_hex: set[str] = set()
if previous_hex and previous_hex in best_by_hex:
ordered.append(best_by_hex[previous_hex])
seen_hex.add(previous_hex)
for item in sorted_candidates:
if item[1] not in seen_hex:
ordered.append(item)
seen_hex.add(item[1])
# Before switching to a new "best" node, try to probe it to ensure it's actually reachable
for _hops, node_hex in ordered:
try:
dest_hash = bytes.fromhex(node_hex)
# We use a short timeout for the probe
if await self.probe_node(dest_hash):
print(
f"Auto-propagation: Switching to better node {node_hex} ({hops} hops) for {self.context.identity_hash}",
)
self.app.set_active_propagation_node(node_hex, context=self.context)
self.config.lxmf_preferred_propagation_node_destination_hash.set(
node_hex,
)
return
except Exception:
continue
if not await self._wait_for_path(dest_hash, PATH_WAIT_SECONDS):
continue
if not await self._probe_propagation_sync(node_hex):
continue
if node_hex != previous_hex:
print(
f"Auto-propagation: Node {node_hex} announced but probe failed, trying next...",
f"Auto-propagation: Switching to verified node {node_hex} "
f"for {self.context.identity_hash}",
)
except Exception as e:
print(f"Auto-propagation: Error probing node {node_hex}: {e}")
self.app.set_active_propagation_node(node_hex, context=self.context)
self.config.lxmf_preferred_propagation_node_destination_hash.set(
node_hex,
)
return
async def probe_node(self, destination_hash):
"""Probes a destination to see if it's reachable."""
try:
# We use the app's probe handler if available
if (
hasattr(self.context, "rnprobe_handler")
and self.context.rnprobe_handler
):
# Re-using the logic from RNProbeHandler but simplified
if not RNS.Transport.has_path(destination_hash):
RNS.Transport.request_path(destination_hash)
# Wait a bit for path
timeout = 5
start = time.time()
while (
not RNS.Transport.has_path(destination_hash)
and time.time() - start < timeout
):
await asyncio.sleep(0.5)
if not RNS.Transport.has_path(destination_hash):
return False
# If we have a path, it's a good sign.
# For propagation nodes, having a path is often enough to try using it.
return True
return RNS.Transport.has_path(destination_hash)
except Exception:
return False
if previous_hex:
self.app.set_active_propagation_node(previous_hex, context=self.context)
else:
self.app.remove_active_propagation_node(context=self.context)
+44 -34
View File
@@ -2,13 +2,19 @@ from unittest.mock import MagicMock, patch
import pytest
import RNS
from LXMF.LXMRouter import LXMRouter
from meshchatx.src.backend.auto_propagation_manager import AutoPropagationManager
_VALID_HASH_A = "01" * 16
_VALID_HASH_B = "02" * 16
_VALID_HASH_C = "03" * 16
_APP_DATA_ENABLED = b"\x94\x00\x00\x01\x00"
@pytest.mark.asyncio
async def test_auto_propagation_logic():
# Mock dependencies
app = MagicMock()
context = MagicMock()
config = MagicMock()
@@ -18,88 +24,92 @@ async def test_auto_propagation_logic():
context.database = database
context.identity_hash = "test_identity"
context.running = True
context.message_router = MagicMock()
context.message_router.propagation_transfer_state = LXMRouter.PR_IDLE
manager = AutoPropagationManager(app, context)
# 1. Test disabled state
config.lxmf_preferred_propagation_node_auto_select.get.return_value = False
with patch.object(manager, "check_and_update_propagation_node") as mock_check:
# Run one iteration manually
if config.lxmf_preferred_propagation_node_auto_select.get():
await manager.check_and_update_propagation_node()
mock_check.assert_not_called()
# 2. Test selection logic
config.lxmf_preferred_propagation_node_auto_select.get.return_value = True
config.lxmf_preferred_propagation_node_destination_hash.get.return_value = None
# Mock announces
announce1 = {
"destination_hash": "aaaa1111",
"app_data": b"\x94\x00\x00\x01\x00", # msgpack for [0, 0, 1, 0] -> enabled=True
"destination_hash": _VALID_HASH_A,
"app_data": _APP_DATA_ENABLED,
}
announce2 = {
"destination_hash": _VALID_HASH_B,
"app_data": _APP_DATA_ENABLED,
}
announce2 = {"destination_hash": "bbbb2222", "app_data": b"\x94\x00\x00\x01\x00"}
database.announces.get_announces.return_value = [announce1, announce2]
# Mock RNS Transport
with (
patch.object(RNS.Transport, "has_path", return_value=True),
patch.object(RNS.Transport, "hops_to") as mock_hops,
patch.object(manager, "probe_node", return_value=True),
patch.object(manager, "_wait_for_path", return_value=True),
patch.object(manager, "_probe_propagation_sync", return_value=True),
):
# announce1 is closer (1 hop)
# announce2 is further (3 hops)
mock_hops.side_effect = lambda dh: 1 if dh == bytes.fromhex("aaaa1111") else 3
mock_hops.side_effect = (
lambda dh: 1 if dh == bytes.fromhex(_VALID_HASH_A) else 3
)
await manager.check_and_update_propagation_node()
# Should have selected aaaa1111
app.set_active_propagation_node.assert_called_with("aaaa1111", context=context)
app.set_active_propagation_node.assert_called_with(
_VALID_HASH_A,
context=context,
)
config.lxmf_preferred_propagation_node_destination_hash.set.assert_called_with(
"aaaa1111",
_VALID_HASH_A,
)
# 3. Test switching to better node
config.lxmf_preferred_propagation_node_destination_hash.get.return_value = (
"bbbb2222"
_VALID_HASH_B
)
app.set_active_propagation_node.reset_mock()
with (
patch.object(RNS.Transport, "has_path", return_value=True),
patch.object(RNS.Transport, "hops_to") as mock_hops,
patch.object(manager, "probe_node", return_value=True),
patch.object(manager, "_wait_for_path", return_value=True),
patch.object(manager, "_probe_propagation_sync", side_effect=[False, True]),
):
mock_hops.side_effect = lambda dh: 1 if dh == bytes.fromhex("aaaa1111") else 3
mock_hops.side_effect = (
lambda dh: 1 if dh == bytes.fromhex(_VALID_HASH_A) else 3
)
await manager.check_and_update_propagation_node()
# Should have switched to aaaa1111 because it's closer
app.set_active_propagation_node.assert_called_with("aaaa1111", context=context)
app.set_active_propagation_node.assert_called_with(
_VALID_HASH_A,
context=context,
)
# 4. Test failover when probe fails
config.lxmf_preferred_propagation_node_destination_hash.get.return_value = (
"cccc3333"
_VALID_HASH_C
)
announce3 = {"destination_hash": "cccc3333", "app_data": b"\x94\x00\x00\x01\x00"}
announce3 = {
"destination_hash": _VALID_HASH_C,
"app_data": _APP_DATA_ENABLED,
}
database.announces.get_announces.return_value = [announce1, announce3]
app.set_active_propagation_node.reset_mock()
with (
patch.object(RNS.Transport, "has_path", return_value=True),
patch.object(RNS.Transport, "hops_to") as mock_hops,
patch.object(manager, "probe_node") as mock_probe,
patch.object(manager, "_wait_for_path", return_value=True),
patch.object(manager, "_probe_propagation_sync", return_value=True),
):
# announce1 is 1 hop, but probe fails
# announce3 is 2 hops, probe succeeds
mock_hops.side_effect = lambda dh: 1 if dh == bytes.fromhex("aaaa1111") else 2
mock_probe.side_effect = lambda dh: (
False if dh == bytes.fromhex("aaaa1111") else True
mock_hops.side_effect = (
lambda dh: 1 if dh == bytes.fromhex(_VALID_HASH_A) else 2
)
await manager.check_and_update_propagation_node()
# Should NOT switch to aaaa1111 because probe failed
# Should STAY on cccc3333 or switch to it if it was different
# Since it's already on cccc3333 and it's the best reachable, no switch
app.set_active_propagation_node.assert_not_called()