From a5fef35a7f58379f40af7d0eec1174cf79daa190 Mon Sep 17 00:00:00 2001 From: Ivan Date: Wed, 15 Apr 2026 20:25:30 -0500 Subject: [PATCH] feat(tests): add integration tests for LXST telephony classes and improve WebAudioBridge tests; mock dependencies for improved test isolation --- tests/backend/conftest.py | 1 - tests/backend/test_lxst_integration.py | 169 +++++++++ tests/backend/test_telephone_audio_ws.py | 149 ++++++++ tests/backend/test_telephone_initiation.py | 397 +++++++++++++++++++++ tests/backend/test_web_audio_bridge.py | 30 +- 5 files changed, 744 insertions(+), 2 deletions(-) create mode 100644 tests/backend/test_lxst_integration.py create mode 100644 tests/backend/test_telephone_audio_ws.py create mode 100644 tests/backend/test_telephone_initiation.py diff --git a/tests/backend/conftest.py b/tests/backend/conftest.py index 9eebfcd..f952168 100644 --- a/tests/backend/conftest.py +++ b/tests/backend/conftest.py @@ -27,7 +27,6 @@ def global_mocks(): return_value=None, ), patch("meshchatx.meshchat.generate_ssl_certificate", return_value=None), - patch("asyncio.sleep", side_effect=lambda *args, **kwargs: asyncio.sleep(0)), ): # Mock run_async to properly close coroutines def mock_run_async(coro): diff --git a/tests/backend/test_lxst_integration.py b/tests/backend/test_lxst_integration.py new file mode 100644 index 0000000..05bf7b1 --- /dev/null +++ b/tests/backend/test_lxst_integration.py @@ -0,0 +1,169 @@ +"""Integration-oriented tests for real LXST telephony classes. + +These tests intentionally use LXST's real ``Telephone`` implementation while stubbing +RNS/network/audio backends, so we validate LXST behavior without requiring hardware. +""" + +from types import SimpleNamespace +from unittest.mock import MagicMock + +import pytest + +pytest.importorskip("LXST") + +from LXST.Primitives import Telephony as LXSTTelephony + +from meshchatx.src.backend.telephone_manager import TelephoneManager + + +class _DummyThread: + def __init__(self, target=None, daemon=None): + self.target = target + self.daemon = daemon + + def start(self): + return None + + +class _DummyDestination: + IN = 0 + OUT = 1 + SINGLE = 2 + PROVE_NONE = 3 + + def __init__(self, identity, *_args, **_kwargs): + self.identity = identity + self.hash = b"\xaa" * 16 + + def set_proof_strategy(self, *_args, **_kwargs): + return None + + def set_link_established_callback(self, *_args, **_kwargs): + return None + + def announce(self, *_args, **_kwargs): + return None + + +def _install_lxst_stubs(monkeypatch): + monkeypatch.setattr(LXSTTelephony.RNS, "log", lambda *_args, **_kwargs: None) + monkeypatch.setattr(LXSTTelephony.RNS, "Destination", _DummyDestination) + monkeypatch.setattr( + LXSTTelephony.RNS, + "Transport", + SimpleNamespace( + deregister_destination=MagicMock(), + has_path=lambda *_args, **_kwargs: True, + request_path=lambda *_args, **_kwargs: None, + ), + ) + monkeypatch.setattr(LXSTTelephony.threading, "Thread", _DummyThread) + + +def _mock_identity(): + return SimpleNamespace(hash=b"\x01" * 16) + + +def test_lxst_telephone_lifecycle_and_timeouts(monkeypatch): + _install_lxst_stubs(monkeypatch) + identity = _mock_identity() + + telephone = LXSTTelephony.Telephone(identity) + assert telephone.call_status == LXSTTelephony.Signalling.STATUS_AVAILABLE + assert telephone.destination is not None + + telephone.set_connect_timeout(37) + assert telephone.establishment_timeout == 37 + + telephone.set_busy_tone_time(0) + assert telephone.busy_tone_seconds == 0 + + telephone.teardown() + assert telephone.destination is None + + +def test_lxst_switch_profile_updates_codec_and_frame_time(monkeypatch): + _install_lxst_stubs(monkeypatch) + identity = _mock_identity() + + class _FakeMixer: + def __init__(self, target_frame_ms=None, gain=0.0): + self.target_frame_ms = target_frame_ms + self.gain = gain + self.muted = False + + def stop(self): + return None + + def start(self): + return None + + def mute(self, muted): + self.muted = muted + + def unmute(self, unmuted): + self.muted = not unmuted + + class _FakeLineSource: + def __init__(self, *args, **kwargs): + self.args = args + self.kwargs = kwargs + + def stop(self): + return None + + def start(self): + return None + + class _FakePipeline: + def __init__(self, source=None, codec=None, sink=None): + self.source = source + self.codec = codec + self.sink = sink + + def stop(self): + return None + + def start(self): + return None + + monkeypatch.setattr(LXSTTelephony, "Mixer", _FakeMixer) + monkeypatch.setattr(LXSTTelephony, "LineSource", _FakeLineSource) + monkeypatch.setattr(LXSTTelephony, "Pipeline", _FakePipeline) + + telephone = LXSTTelephony.Telephone(identity) + telephone.call_status = LXSTTelephony.Signalling.STATUS_ESTABLISHED + telephone.active_call = SimpleNamespace(profile=LXSTTelephony.Profiles.QUALITY_MEDIUM, filters=[], packetizer=MagicMock()) + telephone.transmit_mixer = _FakeMixer(target_frame_ms=60, gain=0.0) + telephone.audio_input = _FakeLineSource() + telephone.transmit_pipeline = _FakePipeline() + + telephone.switch_profile(LXSTTelephony.Profiles.QUALITY_HIGH, from_signalling=True) + + assert telephone.active_call.profile == LXSTTelephony.Profiles.QUALITY_HIGH + assert telephone.target_frame_time_ms == LXSTTelephony.Profiles.get_frame_time( + LXSTTelephony.Profiles.QUALITY_HIGH + ) + assert telephone.transmit_codec is not None + + telephone.active_call = None + telephone.teardown() + + +def test_telephone_manager_init_uses_real_lxst_telephone(monkeypatch, tmp_path): + _install_lxst_stubs(monkeypatch) + cfg = MagicMock() + cfg.telephone_audio_profile_id.get.return_value = LXSTTelephony.Profiles.QUALITY_MAX + tm = TelephoneManager( + identity=_mock_identity(), + config_manager=cfg, + storage_dir=str(tmp_path), + ) + + tm.init_telephone() + + assert isinstance(tm.telephone, LXSTTelephony.Telephone) + assert tm.telephone.establishment_timeout == 30 + assert tm.telephone.busy_tone_seconds == 0 + + tm.teardown() diff --git a/tests/backend/test_telephone_audio_ws.py b/tests/backend/test_telephone_audio_ws.py new file mode 100644 index 0000000..4051d8a --- /dev/null +++ b/tests/backend/test_telephone_audio_ws.py @@ -0,0 +1,149 @@ +import json +from unittest.mock import AsyncMock, MagicMock + +import pytest +from aiohttp import web +from aiohttp.test_utils import TestClient, TestServer + + +def _build_aio_app(app): + routes = web.RouteTableDef() + auth_mw, mime_mw, sec_mw = app._define_routes(routes) + aio_app = web.Application(middlewares=[auth_mw, mime_mw, sec_mw]) + aio_app.add_routes(routes) + return aio_app + + +@pytest.fixture +def web_audio_app(mock_app): + app = mock_app + app.current_context.running = True + app.config.auth_enabled.set(False) + return app + + +@pytest.mark.asyncio +async def test_telephone_audio_ws_disabled_config_returns_error(web_audio_app): + bridge = MagicMock() + bridge.config_enabled.return_value = False + bridge.send_status = AsyncMock() + bridge.attach_client.return_value = False + bridge.detach_client = MagicMock() + bridge.push_client_frame = MagicMock() + web_audio_app.web_audio_bridge = bridge + + aio_app = _build_aio_app(web_audio_app) + async with TestClient(TestServer(aio_app)) as client: + ws = await client.ws_connect("/ws/telephone/audio") + msg = await ws.receive_json() + await ws.close() + + assert msg["type"] == "error" + assert "disabled" in msg["message"].lower() + bridge.send_status.assert_not_called() + bridge.detach_client.assert_called_once() + + +@pytest.mark.asyncio +async def test_telephone_audio_ws_no_active_call_reports_attach_error(web_audio_app): + async def send_ready(ws): + await ws.send_str(json.dumps({"type": "web_audio.ready", "frame_ms": 60})) + + bridge = MagicMock() + bridge.config_enabled.return_value = True + bridge.send_status = AsyncMock(side_effect=send_ready) + bridge.attach_client.return_value = False + bridge.detach_client = MagicMock() + bridge.push_client_frame = MagicMock() + web_audio_app.web_audio_bridge = bridge + + aio_app = _build_aio_app(web_audio_app) + async with TestClient(TestServer(aio_app)) as client: + ws = await client.ws_connect("/ws/telephone/audio") + msg1 = await ws.receive_json() + msg2 = await ws.receive_json() + await ws.close() + + assert msg1["type"] == "web_audio.ready" + assert msg2["type"] == "error" + assert "no active call" in msg2["message"].lower() + bridge.attach_client.assert_called_once() + bridge.detach_client.assert_called_once() + + +@pytest.mark.asyncio +async def test_telephone_audio_ws_ping_and_attach_messages(web_audio_app): + async def send_ready(ws): + await ws.send_str(json.dumps({"type": "web_audio.ready", "frame_ms": 60})) + + bridge = MagicMock() + bridge.config_enabled.return_value = True + bridge.send_status = AsyncMock(side_effect=send_ready) + bridge.attach_client.return_value = True + bridge.detach_client = MagicMock() + bridge.push_client_frame = MagicMock() + web_audio_app.web_audio_bridge = bridge + + aio_app = _build_aio_app(web_audio_app) + async with TestClient(TestServer(aio_app)) as client: + ws = await client.ws_connect("/ws/telephone/audio") + _ready = await ws.receive_json() + await ws.send_json({"type": "attach"}) + await ws.send_json({"type": "ping"}) + pong = await ws.receive_json() + await ws.close() + + assert pong["type"] == "pong" + # One attach during open + one on explicit attach message + assert bridge.attach_client.call_count == 2 + bridge.detach_client.assert_called_once() + + +@pytest.mark.asyncio +async def test_telephone_audio_ws_binary_messages_forward_frames(web_audio_app): + async def send_ready(ws): + await ws.send_str(json.dumps({"type": "web_audio.ready", "frame_ms": 60})) + + bridge = MagicMock() + bridge.config_enabled.return_value = True + bridge.send_status = AsyncMock(side_effect=send_ready) + bridge.attach_client.return_value = True + bridge.detach_client = MagicMock() + bridge.push_client_frame = MagicMock() + web_audio_app.web_audio_bridge = bridge + + aio_app = _build_aio_app(web_audio_app) + async with TestClient(TestServer(aio_app)) as client: + ws = await client.ws_connect("/ws/telephone/audio") + _ready = await ws.receive_json() + await ws.send_bytes(b"\x01\x02\x03") + await ws.close() + + bridge.push_client_frame.assert_called_once_with(b"\x01\x02\x03") + bridge.detach_client.assert_called_once() + + +@pytest.mark.asyncio +async def test_telephone_audio_ws_bad_json_does_not_crash_handler(web_audio_app): + async def send_ready(ws): + await ws.send_str(json.dumps({"type": "web_audio.ready", "frame_ms": 60})) + + bridge = MagicMock() + bridge.config_enabled.return_value = True + bridge.send_status = AsyncMock(side_effect=send_ready) + bridge.attach_client.return_value = True + bridge.detach_client = MagicMock() + bridge.push_client_frame = MagicMock() + web_audio_app.web_audio_bridge = bridge + + aio_app = _build_aio_app(web_audio_app) + async with TestClient(TestServer(aio_app)) as client: + ws = await client.ws_connect("/ws/telephone/audio") + _ready = await ws.receive_json() + await ws.send_str("{not-valid-json") + await ws.send_json({"type": "ping"}) + pong = await ws.receive_json() + await ws.close() + + assert pong["type"] == "pong" + bridge.detach_client.assert_called_once() diff --git a/tests/backend/test_telephone_initiation.py b/tests/backend/test_telephone_initiation.py new file mode 100644 index 0000000..eb0a5b9 --- /dev/null +++ b/tests/backend/test_telephone_initiation.py @@ -0,0 +1,397 @@ +import asyncio +import tracemalloc +import time +from unittest.mock import MagicMock, patch + +import pytest + +from meshchatx.src.backend.telephone_manager import TelephoneManager + + +@pytest.fixture +def telephone_manager(): + tm = TelephoneManager(identity=MagicMock()) + tm.telephone = MagicMock() + tm.telephone.busy = False + tm.telephone.call_status = 3 + tm.telephone.active_call = None + tm._path_poll_interval_s = 0.005 + tm._path_retry_interval_s = 0.01 + tm._status_poll_interval_s = 0.01 + tm._status_events = [] + tm.on_initiation_status_callback = lambda status, _target: tm._status_events.append( + status + ) + return tm + + +@pytest.mark.asyncio +async def test_initiate_retries_path_requests_during_lookup(telephone_manager): + destination_hash = bytes.fromhex("aa" * 16) + state = {"calls": 0} + telephone_manager._path_retry_interval_s = 0.0 + + def has_path(_destination_hash): + state["calls"] += 1 + return state["calls"] >= 8 + + telephone_manager.telephone.call.side_effect = lambda _identity: setattr( + telephone_manager.telephone, "call_status", 0 + ) + + with ( + patch("meshchatx.src.backend.telephone_manager.RNS.Identity.recall", return_value=MagicMock()), + patch("meshchatx.src.backend.telephone_manager.RNS.Transport.has_path", side_effect=has_path), + patch("meshchatx.src.backend.telephone_manager.RNS.Transport.request_path") as request_path, + ): + await telephone_manager.initiate(destination_hash, timeout_seconds=1) + + assert request_path.call_count >= 1 + + +@pytest.mark.asyncio +async def test_initiate_cancels_quickly_while_finding_path_identity(telephone_manager): + destination_hash = bytes.fromhex("bb" * 16) + cancellation_triggered = {"done": False} + + def request_path_and_cancel(_destination_hash): + if not cancellation_triggered["done"]: + cancellation_triggered["done"] = True + telephone_manager._update_initiation_status(None, None) + + with ( + patch("meshchatx.src.backend.telephone_manager.RNS.Identity.recall", return_value=None), + patch("meshchatx.src.backend.telephone_manager.RNS.Transport.has_path", return_value=False), + patch( + "meshchatx.src.backend.telephone_manager.RNS.Transport.request_path", + side_effect=request_path_and_cancel, + ), + ): + task = asyncio.create_task(telephone_manager.initiate(destination_hash, timeout_seconds=5)) + result = await asyncio.wait_for(task, timeout=0.3) + + assert result is None + + +@pytest.mark.asyncio +async def test_initiate_cancels_quickly_while_dialling(telephone_manager): + destination_hash = bytes.fromhex("cc" * 16) + telephone_manager.telephone.call_status = 2 + + def blocking_call(_identity): + time.sleep(0.2) + + telephone_manager.telephone.call.side_effect = blocking_call + + with ( + patch("meshchatx.src.backend.telephone_manager.RNS.Identity.recall", return_value=MagicMock()), + patch("meshchatx.src.backend.telephone_manager.RNS.Transport.has_path", return_value=True), + ): + task = asyncio.create_task(telephone_manager.initiate(destination_hash, timeout_seconds=5)) + for _ in range(200): + if telephone_manager.initiation_status in ("Establishing link...", "Calling..."): + break + await asyncio.sleep(0) + + telephone_manager._update_initiation_status(None, None) + + result = await asyncio.wait_for(task, timeout=0.5) + + assert result is None + assert telephone_manager.telephone.hangup.called + + +@pytest.mark.asyncio +async def test_cancel_between_identity_resolved_and_path_request(telephone_manager): + destination_hash = bytes.fromhex("dd" * 16) + + async def cancel_during_path(*_args, **_kwargs): + telephone_manager._update_initiation_status(None, None) + return False + + with ( + patch( + "meshchatx.src.backend.telephone_manager.RNS.Identity.recall", + return_value=MagicMock(), + ), + patch( + "meshchatx.src.backend.telephone_manager.RNS.Transport.has_path", + return_value=False, + ), + patch.object(telephone_manager, "_await_path", side_effect=cancel_during_path), + ): + result = await asyncio.wait_for( + telephone_manager.initiate(destination_hash, timeout_seconds=1), + timeout=0.3, + ) + + assert result is None + assert not telephone_manager.telephone.call.called + + +@pytest.mark.asyncio +async def test_cancel_after_path_found_before_dialling_stabilizes(telephone_manager): + destination_hash = bytes.fromhex("ee" * 16) + + def slow_call(_identity): + time.sleep(0.1) + telephone_manager.telephone.call_status = 0 + + telephone_manager.telephone.call.side_effect = slow_call + + with ( + patch( + "meshchatx.src.backend.telephone_manager.RNS.Identity.recall", + return_value=MagicMock(), + ), + patch( + "meshchatx.src.backend.telephone_manager.RNS.Transport.has_path", + return_value=True, + ), + ): + task = asyncio.create_task(telephone_manager.initiate(destination_hash, timeout_seconds=2)) + for _ in range(200): + if telephone_manager.initiation_status == "Establishing link...": + break + await asyncio.sleep(0) + telephone_manager._update_initiation_status(None, None) + result = await asyncio.wait_for(task, timeout=0.4) + + assert result is None + assert telephone_manager.telephone.hangup.called + + +@pytest.mark.asyncio +async def test_request_path_exceptions_do_not_abort_discovery(telephone_manager): + destination_hash = bytes.fromhex("12" * 16) + has_path_calls = {"count": 0} + + def has_path(_destination_hash): + has_path_calls["count"] += 1 + return has_path_calls["count"] > 5 + + request_errors = [RuntimeError("boom-1"), RuntimeError("boom-2"), None] + + def request_path(_destination_hash): + value = request_errors.pop(0) if request_errors else None + if isinstance(value, Exception): + raise value + + telephone_manager.telephone.call.side_effect = lambda _identity: setattr( + telephone_manager.telephone, "call_status", 0 + ) + + with ( + patch( + "meshchatx.src.backend.telephone_manager.RNS.Identity.recall", + return_value=MagicMock(), + ), + patch( + "meshchatx.src.backend.telephone_manager.RNS.Transport.has_path", + side_effect=has_path, + ), + patch( + "meshchatx.src.backend.telephone_manager.RNS.Transport.request_path", + side_effect=request_path, + ) as mocked_request_path, + ): + await asyncio.wait_for( + telephone_manager.initiate(destination_hash, timeout_seconds=1), + timeout=0.5, + ) + + assert mocked_request_path.call_count >= 2 + + +@pytest.mark.asyncio +async def test_flapping_path_state_recovers_and_dials(telephone_manager): + destination_hash = bytes.fromhex("34" * 16) + path_states = [False, True, False, True, True] + + def has_path(_destination_hash): + if path_states: + return path_states.pop(0) + return True + + telephone_manager.telephone.call.side_effect = lambda _identity: setattr( + telephone_manager.telephone, "call_status", 0 + ) + + with ( + patch( + "meshchatx.src.backend.telephone_manager.RNS.Identity.recall", + return_value=MagicMock(), + ), + patch( + "meshchatx.src.backend.telephone_manager.RNS.Transport.has_path", + side_effect=has_path, + ), + patch("meshchatx.src.backend.telephone_manager.RNS.Transport.request_path"), + ): + result = await asyncio.wait_for( + telephone_manager.initiate(destination_hash, timeout_seconds=1), + timeout=0.5, + ) + + assert result is None + assert telephone_manager.telephone.call.called + + +@pytest.mark.asyncio +async def test_call_thread_exception_surfaces_without_hanging(telephone_manager): + destination_hash = bytes.fromhex("56" * 16) + telephone_manager.telephone.call.side_effect = RuntimeError("dial failed") + + async def no_wait(_seconds): + return None + + with ( + patch( + "meshchatx.src.backend.telephone_manager.RNS.Identity.recall", + return_value=MagicMock(), + ), + patch( + "meshchatx.src.backend.telephone_manager.RNS.Transport.has_path", + return_value=True, + ), + patch("meshchatx.src.backend.telephone_manager.asyncio.sleep", side_effect=no_wait), + ): + result = await asyncio.wait_for( + telephone_manager.initiate(destination_hash, timeout_seconds=1), + timeout=0.5, + ) + + assert result is None + assert telephone_manager.initiation_status is None + + +@pytest.mark.asyncio +async def test_inconsistent_call_status_finishes_within_timeout(telephone_manager): + destination_hash = bytes.fromhex("78" * 16) + + def inconsistent_call(_identity): + telephone_manager.telephone.call_status = 5 + return None + + telephone_manager.telephone.call.side_effect = inconsistent_call + async def no_wait(_seconds): + return None + + with ( + patch( + "meshchatx.src.backend.telephone_manager.RNS.Identity.recall", + return_value=MagicMock(), + ), + patch( + "meshchatx.src.backend.telephone_manager.RNS.Transport.has_path", + return_value=True, + ), + patch("meshchatx.src.backend.telephone_manager.asyncio.sleep", side_effect=no_wait), + ): + result = await asyncio.wait_for( + telephone_manager.initiate(destination_hash, timeout_seconds=0.2), + timeout=0.8, + ) + + assert result is None + + +@pytest.mark.asyncio +async def test_lxst_status_mapping_updates_ui_initiation_states(telephone_manager): + destination_hash = bytes.fromhex("9a" * 16) + + def status_progression_call(_identity): + telephone_manager.telephone.call_status = 2 + time.sleep(0.02) + telephone_manager.telephone.call_status = 4 + time.sleep(0.02) + telephone_manager.telephone.call_status = 5 + time.sleep(0.02) + telephone_manager.telephone.call_status = 0 + + telephone_manager.telephone.call.side_effect = status_progression_call + + with ( + patch( + "meshchatx.src.backend.telephone_manager.RNS.Identity.recall", + return_value=MagicMock(), + ), + patch( + "meshchatx.src.backend.telephone_manager.RNS.Transport.has_path", + return_value=True, + ), + ): + await asyncio.wait_for( + telephone_manager.initiate(destination_hash, timeout_seconds=1), + timeout=0.8, + ) + + assert "Calling..." in telephone_manager._status_events + assert "Ringing..." in telephone_manager._status_events + assert "Establishing link..." in telephone_manager._status_events + assert telephone_manager.initiation_status is None + + +@pytest.mark.asyncio +async def test_lxst_busy_and_rejected_end_without_stuck_status(telephone_manager): + destination_hash = bytes.fromhex("bc" * 16) + + for terminal_state in (0, 1): + telephone_manager._status_events.clear() + telephone_manager.telephone.call_status = 3 + telephone_manager.telephone.call.side_effect = lambda _identity, state=terminal_state: setattr( + telephone_manager.telephone, "call_status", state + ) + + with ( + patch( + "meshchatx.src.backend.telephone_manager.RNS.Identity.recall", + return_value=MagicMock(), + ), + patch( + "meshchatx.src.backend.telephone_manager.RNS.Transport.has_path", + return_value=True, + ), + ): + result = await asyncio.wait_for( + telephone_manager.initiate(destination_hash, timeout_seconds=0.6), + timeout=0.8, + ) + + assert result is None + assert telephone_manager.initiation_status is None + + +@pytest.mark.asyncio +async def test_rapid_dial_cancel_soak_has_bounded_memory(telephone_manager): + destination_hash = bytes.fromhex("de" * 16) + loops = 120 + + def slow_call(_identity): + time.sleep(0.03) + telephone_manager.telephone.call_status = 2 + + telephone_manager.telephone.call.side_effect = slow_call + + with ( + patch( + "meshchatx.src.backend.telephone_manager.RNS.Identity.recall", + return_value=MagicMock(), + ), + patch( + "meshchatx.src.backend.telephone_manager.RNS.Transport.has_path", + return_value=True, + ), + ): + tracemalloc.start() + for _ in range(loops): + telephone_manager.telephone.call_status = 3 + task = asyncio.create_task(telephone_manager.initiate(destination_hash, timeout_seconds=1)) + await asyncio.sleep(0.005) + telephone_manager._update_initiation_status(None, None) + await asyncio.wait_for(task, timeout=0.5) + _current, peak = tracemalloc.get_traced_memory() + tracemalloc.stop() + + # Keep this lax enough for CI variance while still catching obvious leaks. + assert peak < 80 * 1024 * 1024 diff --git a/tests/backend/test_web_audio_bridge.py b/tests/backend/test_web_audio_bridge.py index e7775bb..5708c97 100644 --- a/tests/backend/test_web_audio_bridge.py +++ b/tests/backend/test_web_audio_bridge.py @@ -1,5 +1,5 @@ import asyncio -from unittest.mock import MagicMock, patch +from unittest.mock import AsyncMock, MagicMock, patch import numpy as np import pytest @@ -103,3 +103,31 @@ def test_web_audio_bridge_asyncutils_fallback(): bridge = WebAudioBridge(mock_tele_mgr, mock_config_mgr) assert bridge.loop == mock_loop assert bridge._loop == mock_loop + + +def test_attach_client_returns_false_without_active_call(): + tele_mgr = MagicMock() + tele_mgr.telephone = MagicMock() + tele_mgr.telephone.active_call = None + bridge = WebAudioBridge(tele_mgr, MagicMock()) + + attached = bridge.attach_client(MagicMock()) + + assert attached is False + + +@pytest.mark.asyncio +async def test_send_bytes_to_all_detaches_stale_clients(): + bridge = WebAudioBridge(MagicMock(), MagicMock()) + healthy_client = MagicMock() + healthy_client.send_bytes = AsyncMock(return_value=None) + + stale_client = MagicMock() + stale_client.send_bytes = AsyncMock(side_effect=RuntimeError("socket closed")) + bridge.clients = {healthy_client, stale_client} + + await bridge._send_bytes_to_all(b"pcm") + + healthy_client.send_bytes.assert_awaited_once_with(b"pcm") + stale_client.send_bytes.assert_awaited_once_with(b"pcm") + assert stale_client not in bridge.clients