test(auto_propagation, CallPage): update tests for propagation sync and telephone enabled config

This commit is contained in:
Ivan
2026-05-09 19:01:40 -05:00
parent d1145cd1c2
commit 9d5d03a712
2 changed files with 120 additions and 5 deletions
+116 -1
View File
@@ -137,7 +137,19 @@ async def test_auto_propagation_skips_when_sync_active_and_path_exists():
)
context.message_router.propagation_transfer_state = LXMRouter.PR_RECEIVING
with patch.object(RNS.Transport, "has_path", return_value=True):
with (
patch.object(RNS.Transport, "has_path", return_value=True),
patch.object(RNS.Transport, "path_is_unresponsive", return_value=False),
patch.object(
manager,
"_wait_for_path",
return_value=True,
),
patch(
"meshchatx.src.backend.auto_propagation_manager.reticulum_pathfinding.transport_path_table_entry_is_expired",
return_value=False,
),
):
await manager.check_and_update_propagation_node()
app.stop_propagation_node_sync.assert_not_called()
@@ -170,6 +182,7 @@ async def test_auto_propagation_finds_new_node_when_sync_stuck_no_path():
patch.object(RNS.Transport, "hops_to", return_value=1),
patch.object(manager, "_wait_for_path", return_value=True),
patch.object(manager, "_probe_propagation_sync", return_value=True),
patch("meshchatx.src.backend.auto_propagation_manager.asyncio.sleep"),
):
# Current node A has no path, candidate B has a path.
mock_has_path.side_effect = lambda dh: dh == bytes.fromhex(_VALID_HASH_B)
@@ -235,3 +248,105 @@ def test_stop_propagation_node_sync_noops_when_message_router_none():
ctx = MagicMock()
ctx.message_router = None
ReticulumMeshChat.stop_propagation_node_sync(app, context=ctx)
@pytest.mark.asyncio
async def test_auto_propagation_interrupts_sync_when_path_unresponsive():
"""Stop a stuck sync when the current path is unresponsive.
Even if RNS reports has_path=True, a stale or unresponsive path should be
treated as broken so the manager can look for a working alternative.
"""
manager, app, context, config, database = _make_manager()
config.lxmf_preferred_propagation_node_auto_select.get.return_value = True
config.lxmf_preferred_propagation_node_destination_hash.get.return_value = (
_VALID_HASH_A
)
context.message_router.propagation_transfer_state = LXMRouter.PR_RECEIVING
announce1 = {
"destination_hash": _VALID_HASH_B,
"app_data": _APP_DATA_ENABLED,
}
database.announces.get_announces.return_value = [announce1]
with (
patch.object(RNS.Transport, "has_path", return_value=True),
patch.object(RNS.Transport, "path_is_unresponsive", return_value=True),
patch.object(RNS.Transport, "hops_to", return_value=1),
patch.object(manager, "_wait_for_path", return_value=True),
patch.object(manager, "_probe_propagation_sync", return_value=True),
patch("meshchatx.src.backend.auto_propagation_manager.asyncio.sleep"),
patch(
"meshchatx.src.backend.auto_propagation_manager.reticulum_pathfinding.transport_path_table_entry_is_expired",
return_value=False,
),
):
await manager.check_and_update_propagation_node()
app.stop_propagation_node_sync.assert_called_once_with(context=context)
app.set_active_propagation_node.assert_called_once_with(
_VALID_HASH_B,
context=context,
)
@pytest.mark.asyncio
async def test_probe_propagation_sync_ignores_stale_state():
"""A stale non-idle state from a previous sync must not cause a false success.
The probe should wait for PR_IDLE before starting, then only count state
changes that happen after the new request is issued.
"""
import time
manager, app, context, config, database = _make_manager()
router = context.message_router
router.propagation_transfer_state = LXMRouter.PR_RECEIVING
call_count = [0]
async def fake_sleep(_):
call_count[0] += 1
if call_count[0] == 3:
router.propagation_transfer_state = LXMRouter.PR_IDLE
fake_time = [0.0]
def fake_monotonic():
fake_time[0] += 0.5
return fake_time[0]
with (
patch(
"meshchatx.src.backend.auto_propagation_manager.asyncio.sleep", fake_sleep
),
patch.object(time, "monotonic", fake_monotonic),
):
result = await manager._probe_propagation_sync(_VALID_HASH_A)
# The stale state goes idle after a few sleeps, but the new request never
# leaves idle, so the probe must return False rather than True.
assert result is False
app.stop_propagation_node_sync.assert_called()
@pytest.mark.asyncio
async def test_sync_propagation_nodes_skips_when_active_and_not_forced():
"""Auto-sync must not overlap an already-active propagation transfer."""
from meshchatx.meshchat import ReticulumMeshChat
app = ReticulumMeshChat.__new__(ReticulumMeshChat)
ctx = MagicMock()
router = MagicMock()
router.propagation_transfer_state = LXMRouter.PR_RECEIVING
router.PR_IDLE = LXMRouter.PR_IDLE
ctx.message_router = router
ctx.config = MagicMock()
with patch.object(app, "stop_propagation_node_sync") as mock_stop:
await app.sync_propagation_nodes(context=ctx, force=False)
router.request_messages_from_propagation_node.assert_not_called()
mock_stop.assert_not_called()
+4 -4
View File
@@ -22,7 +22,7 @@ describe("CallPage.vue", () => {
voicemail: { unread_count: 0 },
};
if (url.includes("/api/v1/config")) return Promise.resolve({ data: { config: {} } });
if (url.includes("/api/v1/config")) return Promise.resolve({ data: { config: { telephone_enabled: true } } });
if (url.includes("/api/v1/telephone/history")) return Promise.resolve({ data: { call_history: [] } });
if (url.includes("/api/v1/announces")) return Promise.resolve({ data: { announces: [] } });
if (url.includes("/api/v1/telephone/status")) return Promise.resolve({ data: { active_call: null } });
@@ -160,7 +160,7 @@ describe("CallPage.vue", () => {
it("displays 'New Call' UI by default when no active call", async () => {
const wrapper = mountCallPage();
await wrapper.vm.$nextTick();
await flushPromises();
expect(wrapper.text()).toContain("New Call");
expect(wrapper.find('input[type="text"]').exists()).toBe(true);
@@ -168,7 +168,7 @@ describe("CallPage.vue", () => {
it("renders call hops and interface metadata below address", async () => {
const wrapper = mountCallPage();
await wrapper.vm.$nextTick();
await flushPromises();
wrapper.vm.activeCall = {
status: 6,
remote_identity_hash: "ab".repeat(16),
@@ -196,7 +196,7 @@ describe("CallPage.vue", () => {
it("attempts to place a call when 'Call' button is clicked", async () => {
const wrapper = mountCallPage();
await wrapper.vm.$nextTick();
await flushPromises();
const input = wrapper.find('input[type="text"]');
await input.setValue("test-destination");