feat(tests): add integration tests for LXST telephony classes and improve WebAudioBridge tests; mock dependencies for improved test isolation

This commit is contained in:
Ivan
2026-04-15 20:25:30 -05:00
parent 3c72a1d0aa
commit a5fef35a7f
5 changed files with 744 additions and 2 deletions
-1
View File
@@ -27,7 +27,6 @@ def global_mocks():
return_value=None,
),
patch("meshchatx.meshchat.generate_ssl_certificate", return_value=None),
patch("asyncio.sleep", side_effect=lambda *args, **kwargs: asyncio.sleep(0)),
):
# Mock run_async to properly close coroutines
def mock_run_async(coro):
+169
View File
@@ -0,0 +1,169 @@
"""Integration-oriented tests for real LXST telephony classes.
These tests intentionally use LXST's real ``Telephone`` implementation while stubbing
RNS/network/audio backends, so we validate LXST behavior without requiring hardware.
"""
from types import SimpleNamespace
from unittest.mock import MagicMock
import pytest
pytest.importorskip("LXST")
from LXST.Primitives import Telephony as LXSTTelephony
from meshchatx.src.backend.telephone_manager import TelephoneManager
class _DummyThread:
def __init__(self, target=None, daemon=None):
self.target = target
self.daemon = daemon
def start(self):
return None
class _DummyDestination:
IN = 0
OUT = 1
SINGLE = 2
PROVE_NONE = 3
def __init__(self, identity, *_args, **_kwargs):
self.identity = identity
self.hash = b"\xaa" * 16
def set_proof_strategy(self, *_args, **_kwargs):
return None
def set_link_established_callback(self, *_args, **_kwargs):
return None
def announce(self, *_args, **_kwargs):
return None
def _install_lxst_stubs(monkeypatch):
monkeypatch.setattr(LXSTTelephony.RNS, "log", lambda *_args, **_kwargs: None)
monkeypatch.setattr(LXSTTelephony.RNS, "Destination", _DummyDestination)
monkeypatch.setattr(
LXSTTelephony.RNS,
"Transport",
SimpleNamespace(
deregister_destination=MagicMock(),
has_path=lambda *_args, **_kwargs: True,
request_path=lambda *_args, **_kwargs: None,
),
)
monkeypatch.setattr(LXSTTelephony.threading, "Thread", _DummyThread)
def _mock_identity():
return SimpleNamespace(hash=b"\x01" * 16)
def test_lxst_telephone_lifecycle_and_timeouts(monkeypatch):
_install_lxst_stubs(monkeypatch)
identity = _mock_identity()
telephone = LXSTTelephony.Telephone(identity)
assert telephone.call_status == LXSTTelephony.Signalling.STATUS_AVAILABLE
assert telephone.destination is not None
telephone.set_connect_timeout(37)
assert telephone.establishment_timeout == 37
telephone.set_busy_tone_time(0)
assert telephone.busy_tone_seconds == 0
telephone.teardown()
assert telephone.destination is None
def test_lxst_switch_profile_updates_codec_and_frame_time(monkeypatch):
_install_lxst_stubs(monkeypatch)
identity = _mock_identity()
class _FakeMixer:
def __init__(self, target_frame_ms=None, gain=0.0):
self.target_frame_ms = target_frame_ms
self.gain = gain
self.muted = False
def stop(self):
return None
def start(self):
return None
def mute(self, muted):
self.muted = muted
def unmute(self, unmuted):
self.muted = not unmuted
class _FakeLineSource:
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
def stop(self):
return None
def start(self):
return None
class _FakePipeline:
def __init__(self, source=None, codec=None, sink=None):
self.source = source
self.codec = codec
self.sink = sink
def stop(self):
return None
def start(self):
return None
monkeypatch.setattr(LXSTTelephony, "Mixer", _FakeMixer)
monkeypatch.setattr(LXSTTelephony, "LineSource", _FakeLineSource)
monkeypatch.setattr(LXSTTelephony, "Pipeline", _FakePipeline)
telephone = LXSTTelephony.Telephone(identity)
telephone.call_status = LXSTTelephony.Signalling.STATUS_ESTABLISHED
telephone.active_call = SimpleNamespace(profile=LXSTTelephony.Profiles.QUALITY_MEDIUM, filters=[], packetizer=MagicMock())
telephone.transmit_mixer = _FakeMixer(target_frame_ms=60, gain=0.0)
telephone.audio_input = _FakeLineSource()
telephone.transmit_pipeline = _FakePipeline()
telephone.switch_profile(LXSTTelephony.Profiles.QUALITY_HIGH, from_signalling=True)
assert telephone.active_call.profile == LXSTTelephony.Profiles.QUALITY_HIGH
assert telephone.target_frame_time_ms == LXSTTelephony.Profiles.get_frame_time(
LXSTTelephony.Profiles.QUALITY_HIGH
)
assert telephone.transmit_codec is not None
telephone.active_call = None
telephone.teardown()
def test_telephone_manager_init_uses_real_lxst_telephone(monkeypatch, tmp_path):
_install_lxst_stubs(monkeypatch)
cfg = MagicMock()
cfg.telephone_audio_profile_id.get.return_value = LXSTTelephony.Profiles.QUALITY_MAX
tm = TelephoneManager(
identity=_mock_identity(),
config_manager=cfg,
storage_dir=str(tmp_path),
)
tm.init_telephone()
assert isinstance(tm.telephone, LXSTTelephony.Telephone)
assert tm.telephone.establishment_timeout == 30
assert tm.telephone.busy_tone_seconds == 0
tm.teardown()
+149
View File
@@ -0,0 +1,149 @@
import json
from unittest.mock import AsyncMock, MagicMock
import pytest
from aiohttp import web
from aiohttp.test_utils import TestClient, TestServer
def _build_aio_app(app):
routes = web.RouteTableDef()
auth_mw, mime_mw, sec_mw = app._define_routes(routes)
aio_app = web.Application(middlewares=[auth_mw, mime_mw, sec_mw])
aio_app.add_routes(routes)
return aio_app
@pytest.fixture
def web_audio_app(mock_app):
app = mock_app
app.current_context.running = True
app.config.auth_enabled.set(False)
return app
@pytest.mark.asyncio
async def test_telephone_audio_ws_disabled_config_returns_error(web_audio_app):
bridge = MagicMock()
bridge.config_enabled.return_value = False
bridge.send_status = AsyncMock()
bridge.attach_client.return_value = False
bridge.detach_client = MagicMock()
bridge.push_client_frame = MagicMock()
web_audio_app.web_audio_bridge = bridge
aio_app = _build_aio_app(web_audio_app)
async with TestClient(TestServer(aio_app)) as client:
ws = await client.ws_connect("/ws/telephone/audio")
msg = await ws.receive_json()
await ws.close()
assert msg["type"] == "error"
assert "disabled" in msg["message"].lower()
bridge.send_status.assert_not_called()
bridge.detach_client.assert_called_once()
@pytest.mark.asyncio
async def test_telephone_audio_ws_no_active_call_reports_attach_error(web_audio_app):
async def send_ready(ws):
await ws.send_str(json.dumps({"type": "web_audio.ready", "frame_ms": 60}))
bridge = MagicMock()
bridge.config_enabled.return_value = True
bridge.send_status = AsyncMock(side_effect=send_ready)
bridge.attach_client.return_value = False
bridge.detach_client = MagicMock()
bridge.push_client_frame = MagicMock()
web_audio_app.web_audio_bridge = bridge
aio_app = _build_aio_app(web_audio_app)
async with TestClient(TestServer(aio_app)) as client:
ws = await client.ws_connect("/ws/telephone/audio")
msg1 = await ws.receive_json()
msg2 = await ws.receive_json()
await ws.close()
assert msg1["type"] == "web_audio.ready"
assert msg2["type"] == "error"
assert "no active call" in msg2["message"].lower()
bridge.attach_client.assert_called_once()
bridge.detach_client.assert_called_once()
@pytest.mark.asyncio
async def test_telephone_audio_ws_ping_and_attach_messages(web_audio_app):
async def send_ready(ws):
await ws.send_str(json.dumps({"type": "web_audio.ready", "frame_ms": 60}))
bridge = MagicMock()
bridge.config_enabled.return_value = True
bridge.send_status = AsyncMock(side_effect=send_ready)
bridge.attach_client.return_value = True
bridge.detach_client = MagicMock()
bridge.push_client_frame = MagicMock()
web_audio_app.web_audio_bridge = bridge
aio_app = _build_aio_app(web_audio_app)
async with TestClient(TestServer(aio_app)) as client:
ws = await client.ws_connect("/ws/telephone/audio")
_ready = await ws.receive_json()
await ws.send_json({"type": "attach"})
await ws.send_json({"type": "ping"})
pong = await ws.receive_json()
await ws.close()
assert pong["type"] == "pong"
# One attach during open + one on explicit attach message
assert bridge.attach_client.call_count == 2
bridge.detach_client.assert_called_once()
@pytest.mark.asyncio
async def test_telephone_audio_ws_binary_messages_forward_frames(web_audio_app):
async def send_ready(ws):
await ws.send_str(json.dumps({"type": "web_audio.ready", "frame_ms": 60}))
bridge = MagicMock()
bridge.config_enabled.return_value = True
bridge.send_status = AsyncMock(side_effect=send_ready)
bridge.attach_client.return_value = True
bridge.detach_client = MagicMock()
bridge.push_client_frame = MagicMock()
web_audio_app.web_audio_bridge = bridge
aio_app = _build_aio_app(web_audio_app)
async with TestClient(TestServer(aio_app)) as client:
ws = await client.ws_connect("/ws/telephone/audio")
_ready = await ws.receive_json()
await ws.send_bytes(b"\x01\x02\x03")
await ws.close()
bridge.push_client_frame.assert_called_once_with(b"\x01\x02\x03")
bridge.detach_client.assert_called_once()
@pytest.mark.asyncio
async def test_telephone_audio_ws_bad_json_does_not_crash_handler(web_audio_app):
async def send_ready(ws):
await ws.send_str(json.dumps({"type": "web_audio.ready", "frame_ms": 60}))
bridge = MagicMock()
bridge.config_enabled.return_value = True
bridge.send_status = AsyncMock(side_effect=send_ready)
bridge.attach_client.return_value = True
bridge.detach_client = MagicMock()
bridge.push_client_frame = MagicMock()
web_audio_app.web_audio_bridge = bridge
aio_app = _build_aio_app(web_audio_app)
async with TestClient(TestServer(aio_app)) as client:
ws = await client.ws_connect("/ws/telephone/audio")
_ready = await ws.receive_json()
await ws.send_str("{not-valid-json")
await ws.send_json({"type": "ping"})
pong = await ws.receive_json()
await ws.close()
assert pong["type"] == "pong"
bridge.detach_client.assert_called_once()
+397
View File
@@ -0,0 +1,397 @@
import asyncio
import tracemalloc
import time
from unittest.mock import MagicMock, patch
import pytest
from meshchatx.src.backend.telephone_manager import TelephoneManager
@pytest.fixture
def telephone_manager():
tm = TelephoneManager(identity=MagicMock())
tm.telephone = MagicMock()
tm.telephone.busy = False
tm.telephone.call_status = 3
tm.telephone.active_call = None
tm._path_poll_interval_s = 0.005
tm._path_retry_interval_s = 0.01
tm._status_poll_interval_s = 0.01
tm._status_events = []
tm.on_initiation_status_callback = lambda status, _target: tm._status_events.append(
status
)
return tm
@pytest.mark.asyncio
async def test_initiate_retries_path_requests_during_lookup(telephone_manager):
destination_hash = bytes.fromhex("aa" * 16)
state = {"calls": 0}
telephone_manager._path_retry_interval_s = 0.0
def has_path(_destination_hash):
state["calls"] += 1
return state["calls"] >= 8
telephone_manager.telephone.call.side_effect = lambda _identity: setattr(
telephone_manager.telephone, "call_status", 0
)
with (
patch("meshchatx.src.backend.telephone_manager.RNS.Identity.recall", return_value=MagicMock()),
patch("meshchatx.src.backend.telephone_manager.RNS.Transport.has_path", side_effect=has_path),
patch("meshchatx.src.backend.telephone_manager.RNS.Transport.request_path") as request_path,
):
await telephone_manager.initiate(destination_hash, timeout_seconds=1)
assert request_path.call_count >= 1
@pytest.mark.asyncio
async def test_initiate_cancels_quickly_while_finding_path_identity(telephone_manager):
destination_hash = bytes.fromhex("bb" * 16)
cancellation_triggered = {"done": False}
def request_path_and_cancel(_destination_hash):
if not cancellation_triggered["done"]:
cancellation_triggered["done"] = True
telephone_manager._update_initiation_status(None, None)
with (
patch("meshchatx.src.backend.telephone_manager.RNS.Identity.recall", return_value=None),
patch("meshchatx.src.backend.telephone_manager.RNS.Transport.has_path", return_value=False),
patch(
"meshchatx.src.backend.telephone_manager.RNS.Transport.request_path",
side_effect=request_path_and_cancel,
),
):
task = asyncio.create_task(telephone_manager.initiate(destination_hash, timeout_seconds=5))
result = await asyncio.wait_for(task, timeout=0.3)
assert result is None
@pytest.mark.asyncio
async def test_initiate_cancels_quickly_while_dialling(telephone_manager):
destination_hash = bytes.fromhex("cc" * 16)
telephone_manager.telephone.call_status = 2
def blocking_call(_identity):
time.sleep(0.2)
telephone_manager.telephone.call.side_effect = blocking_call
with (
patch("meshchatx.src.backend.telephone_manager.RNS.Identity.recall", return_value=MagicMock()),
patch("meshchatx.src.backend.telephone_manager.RNS.Transport.has_path", return_value=True),
):
task = asyncio.create_task(telephone_manager.initiate(destination_hash, timeout_seconds=5))
for _ in range(200):
if telephone_manager.initiation_status in ("Establishing link...", "Calling..."):
break
await asyncio.sleep(0)
telephone_manager._update_initiation_status(None, None)
result = await asyncio.wait_for(task, timeout=0.5)
assert result is None
assert telephone_manager.telephone.hangup.called
@pytest.mark.asyncio
async def test_cancel_between_identity_resolved_and_path_request(telephone_manager):
destination_hash = bytes.fromhex("dd" * 16)
async def cancel_during_path(*_args, **_kwargs):
telephone_manager._update_initiation_status(None, None)
return False
with (
patch(
"meshchatx.src.backend.telephone_manager.RNS.Identity.recall",
return_value=MagicMock(),
),
patch(
"meshchatx.src.backend.telephone_manager.RNS.Transport.has_path",
return_value=False,
),
patch.object(telephone_manager, "_await_path", side_effect=cancel_during_path),
):
result = await asyncio.wait_for(
telephone_manager.initiate(destination_hash, timeout_seconds=1),
timeout=0.3,
)
assert result is None
assert not telephone_manager.telephone.call.called
@pytest.mark.asyncio
async def test_cancel_after_path_found_before_dialling_stabilizes(telephone_manager):
destination_hash = bytes.fromhex("ee" * 16)
def slow_call(_identity):
time.sleep(0.1)
telephone_manager.telephone.call_status = 0
telephone_manager.telephone.call.side_effect = slow_call
with (
patch(
"meshchatx.src.backend.telephone_manager.RNS.Identity.recall",
return_value=MagicMock(),
),
patch(
"meshchatx.src.backend.telephone_manager.RNS.Transport.has_path",
return_value=True,
),
):
task = asyncio.create_task(telephone_manager.initiate(destination_hash, timeout_seconds=2))
for _ in range(200):
if telephone_manager.initiation_status == "Establishing link...":
break
await asyncio.sleep(0)
telephone_manager._update_initiation_status(None, None)
result = await asyncio.wait_for(task, timeout=0.4)
assert result is None
assert telephone_manager.telephone.hangup.called
@pytest.mark.asyncio
async def test_request_path_exceptions_do_not_abort_discovery(telephone_manager):
destination_hash = bytes.fromhex("12" * 16)
has_path_calls = {"count": 0}
def has_path(_destination_hash):
has_path_calls["count"] += 1
return has_path_calls["count"] > 5
request_errors = [RuntimeError("boom-1"), RuntimeError("boom-2"), None]
def request_path(_destination_hash):
value = request_errors.pop(0) if request_errors else None
if isinstance(value, Exception):
raise value
telephone_manager.telephone.call.side_effect = lambda _identity: setattr(
telephone_manager.telephone, "call_status", 0
)
with (
patch(
"meshchatx.src.backend.telephone_manager.RNS.Identity.recall",
return_value=MagicMock(),
),
patch(
"meshchatx.src.backend.telephone_manager.RNS.Transport.has_path",
side_effect=has_path,
),
patch(
"meshchatx.src.backend.telephone_manager.RNS.Transport.request_path",
side_effect=request_path,
) as mocked_request_path,
):
await asyncio.wait_for(
telephone_manager.initiate(destination_hash, timeout_seconds=1),
timeout=0.5,
)
assert mocked_request_path.call_count >= 2
@pytest.mark.asyncio
async def test_flapping_path_state_recovers_and_dials(telephone_manager):
destination_hash = bytes.fromhex("34" * 16)
path_states = [False, True, False, True, True]
def has_path(_destination_hash):
if path_states:
return path_states.pop(0)
return True
telephone_manager.telephone.call.side_effect = lambda _identity: setattr(
telephone_manager.telephone, "call_status", 0
)
with (
patch(
"meshchatx.src.backend.telephone_manager.RNS.Identity.recall",
return_value=MagicMock(),
),
patch(
"meshchatx.src.backend.telephone_manager.RNS.Transport.has_path",
side_effect=has_path,
),
patch("meshchatx.src.backend.telephone_manager.RNS.Transport.request_path"),
):
result = await asyncio.wait_for(
telephone_manager.initiate(destination_hash, timeout_seconds=1),
timeout=0.5,
)
assert result is None
assert telephone_manager.telephone.call.called
@pytest.mark.asyncio
async def test_call_thread_exception_surfaces_without_hanging(telephone_manager):
destination_hash = bytes.fromhex("56" * 16)
telephone_manager.telephone.call.side_effect = RuntimeError("dial failed")
async def no_wait(_seconds):
return None
with (
patch(
"meshchatx.src.backend.telephone_manager.RNS.Identity.recall",
return_value=MagicMock(),
),
patch(
"meshchatx.src.backend.telephone_manager.RNS.Transport.has_path",
return_value=True,
),
patch("meshchatx.src.backend.telephone_manager.asyncio.sleep", side_effect=no_wait),
):
result = await asyncio.wait_for(
telephone_manager.initiate(destination_hash, timeout_seconds=1),
timeout=0.5,
)
assert result is None
assert telephone_manager.initiation_status is None
@pytest.mark.asyncio
async def test_inconsistent_call_status_finishes_within_timeout(telephone_manager):
destination_hash = bytes.fromhex("78" * 16)
def inconsistent_call(_identity):
telephone_manager.telephone.call_status = 5
return None
telephone_manager.telephone.call.side_effect = inconsistent_call
async def no_wait(_seconds):
return None
with (
patch(
"meshchatx.src.backend.telephone_manager.RNS.Identity.recall",
return_value=MagicMock(),
),
patch(
"meshchatx.src.backend.telephone_manager.RNS.Transport.has_path",
return_value=True,
),
patch("meshchatx.src.backend.telephone_manager.asyncio.sleep", side_effect=no_wait),
):
result = await asyncio.wait_for(
telephone_manager.initiate(destination_hash, timeout_seconds=0.2),
timeout=0.8,
)
assert result is None
@pytest.mark.asyncio
async def test_lxst_status_mapping_updates_ui_initiation_states(telephone_manager):
destination_hash = bytes.fromhex("9a" * 16)
def status_progression_call(_identity):
telephone_manager.telephone.call_status = 2
time.sleep(0.02)
telephone_manager.telephone.call_status = 4
time.sleep(0.02)
telephone_manager.telephone.call_status = 5
time.sleep(0.02)
telephone_manager.telephone.call_status = 0
telephone_manager.telephone.call.side_effect = status_progression_call
with (
patch(
"meshchatx.src.backend.telephone_manager.RNS.Identity.recall",
return_value=MagicMock(),
),
patch(
"meshchatx.src.backend.telephone_manager.RNS.Transport.has_path",
return_value=True,
),
):
await asyncio.wait_for(
telephone_manager.initiate(destination_hash, timeout_seconds=1),
timeout=0.8,
)
assert "Calling..." in telephone_manager._status_events
assert "Ringing..." in telephone_manager._status_events
assert "Establishing link..." in telephone_manager._status_events
assert telephone_manager.initiation_status is None
@pytest.mark.asyncio
async def test_lxst_busy_and_rejected_end_without_stuck_status(telephone_manager):
destination_hash = bytes.fromhex("bc" * 16)
for terminal_state in (0, 1):
telephone_manager._status_events.clear()
telephone_manager.telephone.call_status = 3
telephone_manager.telephone.call.side_effect = lambda _identity, state=terminal_state: setattr(
telephone_manager.telephone, "call_status", state
)
with (
patch(
"meshchatx.src.backend.telephone_manager.RNS.Identity.recall",
return_value=MagicMock(),
),
patch(
"meshchatx.src.backend.telephone_manager.RNS.Transport.has_path",
return_value=True,
),
):
result = await asyncio.wait_for(
telephone_manager.initiate(destination_hash, timeout_seconds=0.6),
timeout=0.8,
)
assert result is None
assert telephone_manager.initiation_status is None
@pytest.mark.asyncio
async def test_rapid_dial_cancel_soak_has_bounded_memory(telephone_manager):
destination_hash = bytes.fromhex("de" * 16)
loops = 120
def slow_call(_identity):
time.sleep(0.03)
telephone_manager.telephone.call_status = 2
telephone_manager.telephone.call.side_effect = slow_call
with (
patch(
"meshchatx.src.backend.telephone_manager.RNS.Identity.recall",
return_value=MagicMock(),
),
patch(
"meshchatx.src.backend.telephone_manager.RNS.Transport.has_path",
return_value=True,
),
):
tracemalloc.start()
for _ in range(loops):
telephone_manager.telephone.call_status = 3
task = asyncio.create_task(telephone_manager.initiate(destination_hash, timeout_seconds=1))
await asyncio.sleep(0.005)
telephone_manager._update_initiation_status(None, None)
await asyncio.wait_for(task, timeout=0.5)
_current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
# Keep this lax enough for CI variance while still catching obvious leaks.
assert peak < 80 * 1024 * 1024
+29 -1
View File
@@ -1,5 +1,5 @@
import asyncio
from unittest.mock import MagicMock, patch
from unittest.mock import AsyncMock, MagicMock, patch
import numpy as np
import pytest
@@ -103,3 +103,31 @@ def test_web_audio_bridge_asyncutils_fallback():
bridge = WebAudioBridge(mock_tele_mgr, mock_config_mgr)
assert bridge.loop == mock_loop
assert bridge._loop == mock_loop
def test_attach_client_returns_false_without_active_call():
tele_mgr = MagicMock()
tele_mgr.telephone = MagicMock()
tele_mgr.telephone.active_call = None
bridge = WebAudioBridge(tele_mgr, MagicMock())
attached = bridge.attach_client(MagicMock())
assert attached is False
@pytest.mark.asyncio
async def test_send_bytes_to_all_detaches_stale_clients():
bridge = WebAudioBridge(MagicMock(), MagicMock())
healthy_client = MagicMock()
healthy_client.send_bytes = AsyncMock(return_value=None)
stale_client = MagicMock()
stale_client.send_bytes = AsyncMock(side_effect=RuntimeError("socket closed"))
bridge.clients = {healthy_client, stale_client}
await bridge._send_bytes_to_all(b"pcm")
healthy_client.send_bytes.assert_awaited_once_with(b"pcm")
stale_client.send_bytes.assert_awaited_once_with(b"pcm")
assert stale_client not in bridge.clients