feat(sync): propagation node sync process with path requests and local node handling

- Implemented proactive path requests for outbound propagation nodes during sync operations.
- Added logic to handle local nodes, allowing immediate sync completion without remote lookups.
- Updated statistics collection for peers and unpeered nodes to improve monitoring capabilities.
This commit is contained in:
Ivan
2026-04-16 21:35:46 -05:00
parent ab1be8ea2d
commit 20136e3088
4 changed files with 436 additions and 16 deletions
+168 -13
View File
@@ -2222,9 +2222,12 @@ class ReticulumMeshChat:
# set outbound propagation node
if destination_hash is not None and destination_hash != "":
try:
destination_hash_bytes = bytes.fromhex(destination_hash)
ctx.message_router.set_outbound_propagation_node(
bytes.fromhex(destination_hash),
destination_hash_bytes,
)
with contextlib.suppress(Exception):
RNS.Transport.request_path(destination_hash_bytes)
except Exception:
# failed to set propagation node, clear it to ensure we don't use an old one by mistake
self.remove_active_propagation_node(context=ctx)
@@ -2402,7 +2405,21 @@ class ReticulumMeshChat:
message_store = stats.get("messagestore", {}) if isinstance(stats, dict) else {}
clients = stats.get("clients", {}) if isinstance(stats, dict) else {}
peers = stats.get("peers", {}) if isinstance(stats, dict) else {}
uptime = _numeric(stats.get("uptime", 0)) if isinstance(stats, dict) else 0
peer_rx_bytes = 0
peer_tx_bytes = 0
if isinstance(peers, dict):
for peer_stats in peers.values():
if not isinstance(peer_stats, dict):
continue
peer_rx_bytes += int(_numeric(peer_stats.get("rx_bytes", 0)))
peer_tx_bytes += int(_numeric(peer_stats.get("tx_bytes", 0)))
unpeered_rx_bytes = (
int(_numeric(stats.get("unpeered_propagation_rx_bytes", 0)))
if isinstance(stats, dict)
else 0
)
delivery_limit = (
_numeric(stats.get("delivery_limit", 0))
if isinstance(stats, dict)
@@ -2434,6 +2451,9 @@ class ReticulumMeshChat:
"client_propagation_messages_served",
0,
),
"rx_bytes": peer_rx_bytes + unpeered_rx_bytes,
"tx_bytes": peer_tx_bytes,
"unpeered_rx_bytes": unpeered_rx_bytes,
"static_peers": stats.get("static_peers", 0)
if isinstance(stats, dict)
else 0,
@@ -7349,7 +7369,8 @@ class ReticulumMeshChat:
@routes.get("/api/v1/lxmf/propagation-node/sync")
async def propagation_node_sync(request):
# ensure propagation node is configured before attempting to sync
if self.message_router.get_outbound_propagation_node() is None:
outbound_node = self.message_router.get_outbound_propagation_node()
if outbound_node is None:
return web.json_response(
{
"message": "A propagation node must be configured to sync messages.",
@@ -7357,6 +7378,12 @@ class ReticulumMeshChat:
status=400,
)
# proactively request path, but do not block/fail here.
# LXMF internally manages PR_PATH_REQUESTED and retries.
if not RNS.Transport.has_path(outbound_node):
with contextlib.suppress(Exception):
RNS.Transport.request_path(outbound_node)
# request messages from propagation node
await self.sync_propagation_nodes()
@@ -7500,6 +7527,15 @@ class ReticulumMeshChat:
if updated_at and "+" not in updated_at and "Z" not in updated_at:
updated_at += "Z"
is_local_node = (
announce["identity_hash"] == local_identity_hash
or announce["destination_hash"] == local_destination_hash
)
if is_local_node and isinstance(local_stats, dict):
local_running = local_stats.get("is_running")
if isinstance(local_running, bool):
is_propagation_enabled = local_running
lxmf_propagation_nodes.append(
{
"destination_hash": announce["destination_hash"],
@@ -7507,16 +7543,8 @@ class ReticulumMeshChat:
"operator_display_name": operator_display_name,
"is_propagation_enabled": is_propagation_enabled,
"per_transfer_limit": per_transfer_limit,
"is_local_node": (
announce["identity_hash"] == local_identity_hash
or announce["destination_hash"] == local_destination_hash
),
"local_node_stats": (
local_stats
if announce["identity_hash"] == local_identity_hash
or announce["destination_hash"] == local_destination_hash
else None
),
"is_local_node": is_local_node,
"local_node_stats": (local_stats if is_local_node else None),
"created_at": created_at,
"updated_at": updated_at,
},
@@ -7537,7 +7565,12 @@ class ReticulumMeshChat:
"destination_hash": local_destination_hash,
"identity_hash": local_identity_hash,
"operator_display_name": ctx.config.display_name.get(),
"is_propagation_enabled": ctx.config.lxmf_local_propagation_node_enabled.get(),
"is_propagation_enabled": (
local_stats.get("is_running")
if isinstance(local_stats, dict)
and isinstance(local_stats.get("is_running"), bool)
else ctx.config.lxmf_local_propagation_node_enabled.get()
),
"per_transfer_limit": int(
getattr(
ctx.message_router, "propagation_per_transfer_limit", 0
@@ -7564,6 +7597,30 @@ class ReticulumMeshChat:
# convert destination hash to bytes
destination_hash = bytes.fromhex(destination_hash)
destination_hash_hex = destination_hash.hex()
local_hashes: set[str] = set()
with contextlib.suppress(Exception):
if self.current_context and self.current_context.identity:
local_hashes.add(self.current_context.identity.hash.hex())
with contextlib.suppress(Exception):
if self.local_lxmf_destination is not None:
local_hashes.add(self.local_lxmf_destination.hash.hex())
with contextlib.suppress(Exception):
if self.current_context and self.current_context.message_router:
pdest = self.current_context.message_router.propagation_destination
if pdest is not None and getattr(pdest, "hash", None):
local_hashes.add(pdest.hash.hex())
if destination_hash_hex in local_hashes:
return web.json_response(
{
"path": {
"hops": 0,
"next_hop": destination_hash_hex,
"next_hop_interface": "Local",
},
},
)
# check if user wants to request the path from the network right now
request_query_param = request.query.get("request", "false")
@@ -8442,6 +8499,25 @@ class ReticulumMeshChat:
try:
status = self.bot_handler.get_status()
templates = self.bot_handler.get_available_templates()
if self.database:
for bot in status.get("bots") or []:
lxmf_addr = bot.get("lxmf_address") or bot.get("full_address")
if not lxmf_addr:
bot["last_announce_at"] = None
continue
lxmf_addr = str(lxmf_addr).strip().lower()
ann = self.database.announces.get_announce_by_hash(lxmf_addr)
if not ann:
bot["last_announce_at"] = None
continue
arow = dict(ann) if not isinstance(ann, dict) else ann
ts = arow.get("updated_at")
if ts is not None and hasattr(ts, "isoformat"):
bot["last_announce_at"] = ts.isoformat()
else:
bot["last_announce_at"] = (
str(ts) if ts is not None else None
)
return web.json_response(
{
"status": status,
@@ -8541,6 +8617,62 @@ class ReticulumMeshChat:
status=500,
)
@routes.patch("/api/v1/bots/update")
async def bots_update(request):
data = await request.json()
bot_id = data.get("bot_id")
name = data.get("name")
if not bot_id:
return web.json_response(
{"message": "bot_id is required"},
status=400,
)
try:
self.bot_handler.update_bot_name(bot_id, name)
return web.json_response({"success": True})
except ValueError as e:
return web.json_response(
{"message": str(e)},
status=400,
)
except Exception as e:
return web.json_response(
{"message": str(e)},
status=500,
)
@routes.post("/api/v1/bots/announce")
async def bots_announce(request):
data = await request.json()
bot_id = data.get("bot_id")
if not bot_id:
return web.json_response(
{"message": "bot_id is required"},
status=400,
)
try:
self.bot_handler.request_announce(bot_id)
return web.json_response({"success": True})
except ValueError as e:
return web.json_response(
{"message": str(e)},
status=400,
)
except RuntimeError as e:
return web.json_response(
{"message": str(e)},
status=409,
)
except Exception as e:
return web.json_response(
{"message": str(e)},
status=500,
)
@routes.get("/api/v1/bots/export")
async def bots_export(request):
bot_id = request.query.get("bot_id")
@@ -10816,6 +10948,29 @@ class ReticulumMeshChat:
# update last synced at timestamp
ctx.config.lxmf_preferred_propagation_node_last_synced_at.set(int(time.time()))
outbound_node = ctx.message_router.get_outbound_propagation_node()
local_propagation_destination = getattr(
ctx.message_router,
"propagation_destination",
None,
)
local_propagation_hash = getattr(local_propagation_destination, "hash", None)
if (
isinstance(outbound_node, (bytes, bytearray))
and isinstance(local_propagation_hash, (bytes, bytearray))
and bytes(outbound_node) == bytes(local_propagation_hash)
):
# Local node selected as preferred: no transport path lookup is needed.
# Mark sync as complete immediately to avoid getting stuck in PR_PATH_REQUESTED.
with contextlib.suppress(Exception):
ctx.message_router.propagation_transfer_state = (
ctx.message_router.PR_COMPLETE
)
ctx.message_router.propagation_transfer_progress = 1.0
ctx.message_router.propagation_transfer_last_result = 0
await self.send_config_to_websocket_clients(context=ctx)
return
# request messages from propagation node
ctx.message_router.request_messages_from_propagation_node(ctx.identity)
+79 -3
View File
@@ -3,6 +3,7 @@
import json
import shutil
import tempfile
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
import LXMF
@@ -137,6 +138,46 @@ async def test_lxmf_sync_flow(mock_app):
assert data["propagation_node_status"]["progress"] == 75.0
@pytest.mark.asyncio
async def test_lxmf_sync_requests_path_before_sync(mock_app):
mock_router = mock_app.current_context.message_router
outbound = b"somehash"
mock_router.get_outbound_propagation_node.return_value = outbound
sync_handler = next(
r.handler
for r in mock_app.get_routes()
if r.path == "/api/v1/lxmf/propagation-node/sync"
)
with patch("meshchatx.meshchat.RNS.Transport.has_path", return_value=False):
with patch("meshchatx.meshchat.RNS.Transport.request_path") as mock_request:
await sync_handler(None)
mock_request.assert_called_with(outbound)
mock_router.request_messages_from_propagation_node.assert_called_with(
mock_app.current_context.identity
)
@pytest.mark.asyncio
async def test_lxmf_sync_completes_immediately_for_local_preferred_node(mock_app):
mock_router = mock_app.current_context.message_router
local_hash = b"local-prop-node-1"
mock_router.propagation_destination = SimpleNamespace(hash=local_hash)
mock_router.get_outbound_propagation_node.return_value = local_hash
sync_handler = next(
r.handler
for r in mock_app.get_routes()
if r.path == "/api/v1/lxmf/propagation-node/sync"
)
await sync_handler(None)
assert mock_router.propagation_transfer_state == PR_COMPLETE
assert mock_router.propagation_transfer_progress == 1.0
assert mock_router.propagation_transfer_last_result == 0
mock_router.request_messages_from_propagation_node.assert_not_called()
@pytest.mark.asyncio
async def test_hosting_prop_node(mock_app):
mock_router = mock_app.current_context.message_router
@@ -214,6 +255,11 @@ async def test_local_propagation_node_stop_and_restart_routes(mock_app):
"client_propagation_messages_received": 7,
"client_propagation_messages_served": 5,
},
"peers": {
"peer-a": {"rx_bytes": 1000, "tx_bytes": 600},
"peer-b": {"rx_bytes": 300, "tx_bytes": 400},
},
"unpeered_propagation_rx_bytes": 200,
"delivery_limit": 10,
"propagation_limit": 20,
"sync_limit": 30,
@@ -238,11 +284,19 @@ async def test_local_propagation_node_stop_and_restart_routes(mock_app):
stop_response = await stop_handler(None)
stop_data = json.loads(stop_response.body)
assert stop_data["message"] == "Local propagation node stopped"
local_stop = stop_data["local_propagation_node"]
assert local_stop["rx_bytes"] == 1500
assert local_stop["tx_bytes"] == 1000
assert local_stop["messagestore_limit_bytes"] == 4096
assert local_stop["client_messages_received"] == 7
mock_router.disable_propagation.assert_called()
restart_response = await restart_handler(None)
restart_data = json.loads(restart_response.body)
assert restart_data["message"] == "Local propagation node restarted"
local_restart = restart_data["local_propagation_node"]
assert local_restart["rx_bytes"] == 1500
assert local_restart["tx_bytes"] == 1000
mock_router.enable_propagation.assert_called()
@@ -252,9 +306,11 @@ async def test_user_provided_node_hash(mock_app):
node_hash_hex = "d81255ae2ff367d4883b16c9cc8c6178"
# Set this node as preferred
await mock_app.update_config(
{"lxmf_preferred_propagation_node_destination_hash": node_hash_hex},
)
with patch("meshchatx.meshchat.RNS.Transport.request_path") as mock_request_path:
await mock_app.update_config(
{"lxmf_preferred_propagation_node_destination_hash": node_hash_hex},
)
mock_request_path.assert_called_with(bytes.fromhex(node_hash_hex))
# Check if the router was updated with the correct bytes
mock_app.current_context.message_router.set_outbound_propagation_node.assert_called_with(
@@ -278,6 +334,26 @@ async def test_user_provided_node_hash(mock_app):
)
@pytest.mark.asyncio
async def test_destination_path_returns_local_hop_zero_for_local_destinations(mock_app):
local_hash = mock_app.current_context.identity.hash.hex()
path_handler = next(
r.handler
for r in mock_app.get_routes()
if r.path == "/api/v1/destination/{destination_hash}/path"
)
request = SimpleNamespace(
match_info={"destination_hash": local_hash},
query={},
)
response = await path_handler(request)
data = json.loads(response.body)
assert data["path"]["hops"] == 0
assert data["path"]["next_hop"] == local_hash
assert data["path"]["next_hop_interface"] == "Local"
def test_convert_propagation_node_state_maps_all_lxmf_transfer_states():
from meshchatx.src.backend.meshchat_utils import (
convert_propagation_node_state_to_string,
@@ -0,0 +1,171 @@
# SPDX-License-Identifier: 0BSD
import json
import shutil
import tempfile
from types import SimpleNamespace
from unittest.mock import patch
import LXMF
import pytest
import RNS
from meshchatx.meshchat import ReticulumMeshChat
PR_IDLE = LXMF.LXMRouter.PR_IDLE
PR_PATH_REQUESTED = LXMF.LXMRouter.PR_PATH_REQUESTED
PR_PATH_TIMEOUT = LXMF.LXMRouter.PR_PATH_TIMEOUT
PR_RECEIVING = LXMF.LXMRouter.PR_RECEIVING
PR_COMPLETE = LXMF.LXMRouter.PR_COMPLETE
PR_FAILED = LXMF.LXMRouter.PR_FAILED
class FakePropagationRouter:
PR_IDLE = PR_IDLE
PR_PATH_REQUESTED = PR_PATH_REQUESTED
PR_COMPLETE = PR_COMPLETE
PR_FAILED = PR_FAILED
def __init__(self, local_hash: bytes):
self.outbound_propagation_node = None
self.propagation_destination = SimpleNamespace(
hash=local_hash,
hexhash=local_hash.hex(),
)
self.propagation_transfer_state = self.PR_IDLE
self.propagation_transfer_progress = 0.0
self.propagation_transfer_last_result = 0
self.request_messages_calls = 0
self.propagation_node = False
def set_outbound_propagation_node(self, destination_hash: bytes):
self.outbound_propagation_node = destination_hash
def get_outbound_propagation_node(self):
return self.outbound_propagation_node
def request_messages_from_propagation_node(self, _identity):
self.request_messages_calls += 1
if self.outbound_propagation_node is None:
self.propagation_transfer_state = self.PR_FAILED
self.propagation_transfer_progress = 0.0
return
if RNS.Transport.has_path(self.outbound_propagation_node):
self.propagation_transfer_state = self.PR_COMPLETE
self.propagation_transfer_progress = 1.0
self.propagation_transfer_last_result = 3
return
RNS.Transport.request_path(self.outbound_propagation_node)
self.propagation_transfer_state = self.PR_PATH_REQUESTED
self.propagation_transfer_progress = 0.0
@pytest.fixture
def temp_dir():
dir_path = tempfile.mkdtemp()
yield dir_path
shutil.rmtree(dir_path)
@pytest.fixture
def integration_app(temp_dir):
with (
patch("RNS.Reticulum") as mock_rns,
patch("RNS.Transport"),
patch("LXMF.LXMRouter"),
patch("meshchatx.meshchat.get_file_path", return_value="/tmp/mock_path"),
patch("meshchatx.meshchat.generate_ssl_certificate"),
patch("meshchatx.src.backend.meshchat_utils.LXMRouter") as mock_utils_router,
):
mock_rns.return_value.transport_enabled.return_value = False
mock_utils_router.PR_IDLE = PR_IDLE
mock_utils_router.PR_PATH_REQUESTED = PR_PATH_REQUESTED
mock_utils_router.PR_PATH_TIMEOUT = PR_PATH_TIMEOUT
mock_utils_router.PR_RECEIVING = PR_RECEIVING
mock_utils_router.PR_COMPLETE = PR_COMPLETE
mock_utils_router.PR_FAILED = PR_FAILED
app = ReticulumMeshChat(
identity=RNS.Identity(),
storage_dir=temp_dir,
reticulum_config_dir=temp_dir,
)
fake_router = FakePropagationRouter(local_hash=b"\x11" * 16)
app.current_context.message_router = fake_router
with patch.object(
app,
"send_config_to_websocket_clients",
return_value=None,
):
yield app, fake_router
def _route_handler(app, path, method="GET"):
return next(r.handler for r in app.get_routes() if r.path == path and r.method == method)
@pytest.mark.asyncio
@pytest.mark.integration
async def test_remote_propagation_sync_transitions_path_requested_to_complete(integration_app):
app, fake_router = integration_app
remote_hash = b"\x22" * 16
fake_router.set_outbound_propagation_node(remote_hash)
known_paths = set()
request_count = {}
def has_path(destination_hash):
return bytes(destination_hash) in known_paths
def request_path(destination_hash):
key = bytes(destination_hash)
request_count[key] = request_count.get(key, 0) + 1
if request_count[key] >= 2:
known_paths.add(key)
sync_handler = _route_handler(app, "/api/v1/lxmf/propagation-node/sync")
status_handler = _route_handler(app, "/api/v1/lxmf/propagation-node/status")
with (
patch("meshchatx.meshchat.RNS.Transport.has_path", side_effect=has_path),
patch("meshchatx.meshchat.RNS.Transport.request_path", side_effect=request_path),
):
first_sync = await sync_handler(None)
assert first_sync.status == 200
first_status = json.loads((await status_handler(None)).body)["propagation_node_status"]
assert first_status["state"] == "path_requested"
second_sync = await sync_handler(None)
assert second_sync.status == 200
second_status = json.loads((await status_handler(None)).body)["propagation_node_status"]
assert second_status["state"] == "complete"
assert second_status["progress"] == 100.0
assert fake_router.request_messages_calls >= 2
@pytest.mark.asyncio
@pytest.mark.integration
async def test_local_preferred_propagation_sync_completes_without_remote_lookup(integration_app):
app, fake_router = integration_app
local_hash = fake_router.propagation_destination.hash
fake_router.set_outbound_propagation_node(local_hash)
sync_handler = _route_handler(app, "/api/v1/lxmf/propagation-node/sync")
status_handler = _route_handler(app, "/api/v1/lxmf/propagation-node/status")
with (
patch("meshchatx.meshchat.RNS.Transport.has_path", return_value=False),
patch("meshchatx.meshchat.RNS.Transport.request_path"),
):
response = await sync_handler(None)
assert response.status == 200
status_data = json.loads((await status_handler(None)).body)["propagation_node_status"]
assert status_data["state"] == "complete"
assert status_data["progress"] == 100.0
assert fake_router.request_messages_calls == 0
@@ -65,6 +65,12 @@ async def test_propagation_nodes_endpoint_robustness(mock_rns_minimal, temp_dir)
assert "lxmf_propagation_nodes" in data
for node in data["lxmf_propagation_nodes"]:
assert "is_local_node" in node
if node.get("is_local_node") and isinstance(node.get("local_node_stats"), dict):
if isinstance(node["local_node_stats"].get("is_running"), bool):
assert (
node.get("is_propagation_enabled")
== node["local_node_stats"]["is_running"]
)
# Test with invalid limit (should not crash)
request.query = {"limit": "invalid"}
@@ -74,6 +80,12 @@ async def test_propagation_nodes_endpoint_robustness(mock_rns_minimal, temp_dir)
assert "lxmf_propagation_nodes" in data
for node in data["lxmf_propagation_nodes"]:
assert "is_local_node" in node
if node.get("is_local_node") and isinstance(node.get("local_node_stats"), dict):
if isinstance(node["local_node_stats"].get("is_running"), bool):
assert (
node.get("is_propagation_enabled")
== node["local_node_stats"]["is_running"]
)
# Test with missing limit (should not crash)
request.query = {}
@@ -83,3 +95,9 @@ async def test_propagation_nodes_endpoint_robustness(mock_rns_minimal, temp_dir)
assert "lxmf_propagation_nodes" in data
for node in data["lxmf_propagation_nodes"]:
assert "is_local_node" in node
if node.get("is_local_node") and isinstance(node.get("local_node_stats"), dict):
if isinstance(node["local_node_stats"].get("is_running"), bool):
assert (
node.get("is_propagation_enabled")
== node["local_node_stats"]["is_running"]
)