import os
import random
from contextlib import ExitStack
from unittest.mock import MagicMock, patch
import LXMF
import pytest
import RNS
from hypothesis import HealthCheck, given, settings
from hypothesis import strategies as st
from meshchatx.meshchat import ReticulumMeshChat
@pytest.fixture
def temp_dir(tmp_path):
return str(tmp_path)
@pytest.fixture
def mock_app(temp_dir):
real_identity_class = RNS.Identity
class MockIdentityClass(real_identity_class):
def __init__(self, *args, **kwargs):
self.hash = b"test_hash_32_bytes_long_01234567"
self.hexhash = self.hash.hex()
with ExitStack() as stack:
stack.enter_context(patch("meshchatx.src.backend.identity_context.Database"))
stack.enter_context(
patch("meshchatx.src.backend.identity_context.ConfigManager"),
)
stack.enter_context(
patch("meshchatx.src.backend.identity_context.MessageHandler"),
)
stack.enter_context(
patch("meshchatx.src.backend.identity_context.AnnounceManager"),
)
stack.enter_context(
patch("meshchatx.src.backend.identity_context.ArchiverManager"),
)
stack.enter_context(patch("meshchatx.src.backend.identity_context.MapManager"))
stack.enter_context(
patch("meshchatx.src.backend.identity_context.TelephoneManager"),
)
stack.enter_context(
patch("meshchatx.src.backend.identity_context.VoicemailManager"),
)
stack.enter_context(
patch("meshchatx.src.backend.identity_context.RingtoneManager"),
)
stack.enter_context(patch("meshchatx.src.backend.identity_context.RNCPHandler"))
stack.enter_context(
patch("meshchatx.src.backend.identity_context.RNStatusHandler"),
)
stack.enter_context(
patch("meshchatx.src.backend.identity_context.RNProbeHandler"),
)
stack.enter_context(
patch("meshchatx.src.backend.identity_context.TranslatorHandler"),
)
stack.enter_context(
patch("meshchatx.src.backend.identity_context.CommunityInterfacesManager"),
)
stack.enter_context(patch("meshchatx.meshchat.AsyncUtils"))
stack.enter_context(patch("LXMF.LXMRouter"))
stack.enter_context(patch("LXST.Primitives.Telephony"))
stack.enter_context(patch("RNS.Identity", MockIdentityClass))
mock_reticulum_class = stack.enter_context(patch("RNS.Reticulum"))
mock_reticulum_class.MTU = 1200
mock_reticulum_class.return_value.MTU = 1200
mock_transport_class = stack.enter_context(patch("RNS.Transport"))
mock_transport_class.MTU = 1200
mock_transport_class.return_value.MTU = 1200
stack.enter_context(patch("threading.Thread"))
stack.enter_context(
patch.object(
ReticulumMeshChat,
"announce_loop",
new=MagicMock(return_value=None),
),
)
stack.enter_context(
patch.object(
ReticulumMeshChat,
"announce_sync_propagation_nodes",
new=MagicMock(return_value=None),
),
)
stack.enter_context(
patch.object(
ReticulumMeshChat,
"crawler_loop",
new=MagicMock(return_value=None),
),
)
stack.enter_context(
patch.object(
ReticulumMeshChat,
"auto_backup_loop",
new=MagicMock(return_value=None),
),
)
mock_id = MockIdentityClass()
mock_id.get_private_key = MagicMock(return_value=b"test_private_key")
stack.enter_context(
patch.object(MockIdentityClass, "from_file", return_value=mock_id),
)
app = ReticulumMeshChat(
identity=mock_id,
storage_dir=temp_dir,
reticulum_config_dir=temp_dir,
)
app.active_downloads = {}
app.download_id_counter = 0
app.database = MagicMock()
# Setup basic config mocks
app.config = MagicMock()
for attr in [
"display_name",
"theme",
"language",
"voicemail_greeting",
"map_default_lat",
"map_default_lon",
]:
getattr(app.config, attr).get.return_value = "test"
for attr in ["auto_announce_enabled", "voicemail_enabled"]:
getattr(app.config, attr).get.return_value = False
app.config.lxmf_inbound_stamp_cost.get.return_value = 8
return app
# WebSocket API Fuzzing
@settings(suppress_health_check=[HealthCheck.function_scoped_fixture], deadline=None)
@given(
data=st.recursive(
st.one_of(st.none(), st.booleans(), st.floats(), st.text(), st.integers()),
lambda children: st.one_of(
st.lists(children),
st.dictionaries(st.text(), children),
),
),
)
@pytest.mark.asyncio
async def test_websocket_api_recursive_fuzzing(mock_app, data):
"""Fuzz the websocket API with recursive random data structures."""
mock_client = MagicMock()
mock_client.send_str = MagicMock(side_effect=lambda d: None)
# We must ensure "type" is present if it's a dict, or it might fail early
# but that's also part of fuzzing - seeing if it crashes without "type".
try:
await mock_app.on_websocket_data_received(mock_client, data)
except (KeyError, TypeError, ValueError, AttributeError):
# These are expected for malformed data
pass
except Exception as e:
pytest.fail(f"WebSocket API crashed with recursive data: {e}")
@settings(suppress_health_check=[HealthCheck.function_scoped_fixture], deadline=None)
@given(msg_type=st.text(), payload=st.dictionaries(st.text(), st.text()))
@pytest.mark.asyncio
async def test_websocket_api_type_fuzzing(mock_app, msg_type, payload):
"""Fuzz the websocket API with random types and flat dictionary payloads."""
mock_client = MagicMock()
mock_client.send_str = MagicMock(side_effect=lambda d: None)
data = {"type": msg_type}
data.update(payload)
try:
await mock_app.on_websocket_data_received(mock_client, data)
except (KeyError, TypeError, ValueError, AttributeError):
pass
except Exception as e:
pytest.fail(f"WebSocket API crashed with type={msg_type}: {e}")
# URI Parsing Fuzzing
@settings(suppress_health_check=[HealthCheck.function_scoped_fixture], deadline=None)
@given(uri=st.text(min_size=0, max_size=2000))
@pytest.mark.asyncio
async def test_lxm_uri_parsing_fuzzing(mock_app, uri):
"""Fuzz LXM URI ingestion logic."""
try:
# Assuming there's a method or logic that handles lxm:// URIs
if hasattr(mock_app, "ingest_lxm_uri"):
await mock_app.ingest_lxm_uri(uri)
# Also test it through the websocket interface if it exists there
mock_client = MagicMock()
await mock_app.on_websocket_data_received(
mock_client,
{"type": "lxm.ingest_uri", "uri": uri},
)
except (KeyError, TypeError, ValueError, AttributeError):
pass
except Exception as e:
# Some specific exceptions might be okay depending on implementation,
# but generic crash is not.
if "hex" not in str(e).lower(): # Ignore hex decoding errors which are common
pytest.fail(f"URI ingestion crashed with uri={uri}: {e}")
# LXMF Packet/Message Fuzzing
@settings(suppress_health_check=[HealthCheck.function_scoped_fixture], deadline=None)
@given(
content=st.binary(min_size=0, max_size=5000),
title=st.text(min_size=0, max_size=500),
fields=st.dictionaries(st.integers(min_value=0, max_value=255), st.binary()),
)
def test_lxmf_message_construction_fuzzing(mock_app, content, title, fields):
"""Fuzz LXMF message construction and field handling."""
try:
# Mock a destination
dest = MagicMock()
dest.hash = os.urandom(16)
# Test field assignment
lxmf_msg = LXMF.LXMessage(dest, mock_app.identity, content, title=title)
for k, v in fields.items():
lxmf_msg.fields[k] = v
# Test encoding/decoding if possible
lxmf_msg.pack()
# LXMF.LXMessage.unpack(packed) # Might need more mocks
except Exception:
pass
# Database Migration/Data Fuzzing
@settings(suppress_health_check=[HealthCheck.function_scoped_fixture], deadline=None)
@given(
table_name=st.sampled_from(["messages", "announces", "identities", "config"]),
data=st.dictionaries(
st.text(),
st.one_of(st.text(), st.integers(), st.binary(), st.none()),
),
)
def test_database_record_fuzzing(mock_app, table_name, data):
"""Fuzz database record insertion logic (simulated)."""
try:
dao = None
if table_name == "messages" and hasattr(mock_app.database, "messages"):
dao = mock_app.database.messages
elif table_name == "announces" and hasattr(mock_app.database, "announces"):
dao = mock_app.database.announces
if dao and hasattr(dao, "upsert"):
dao.upsert(data)
except Exception:
pass
# Config Update Fuzzing
@settings(suppress_health_check=[HealthCheck.function_scoped_fixture], deadline=None)
@given(
config_updates=st.dictionaries(
st.sampled_from(
[
"display_name",
"auto_announce_enabled",
"theme",
"language",
"voicemail_enabled",
"voicemail_greeting",
"map_default_lat",
"map_default_lon",
"lxmf_inbound_stamp_cost",
],
),
st.one_of(st.text(), st.integers(), st.booleans(), st.none()),
),
)
@pytest.mark.asyncio
async def test_config_update_fuzzing(mock_app, config_updates):
"""Fuzz the update_config method with various types for known keys."""
try:
if hasattr(mock_app, "update_config"):
await mock_app.update_config(config_updates)
except (TypeError, ValueError, AttributeError):
pass
except Exception as e:
pytest.fail(f"Config update crashed: {e}")
# LXMF Paper URI Generation Fuzzing
@settings(suppress_health_check=[HealthCheck.function_scoped_fixture], deadline=None)
@given(destination_hash=st.text(), content=st.text(), title=st.text())
@pytest.mark.asyncio
async def test_lxm_generate_paper_uri_fuzzing(
mock_app,
destination_hash,
content,
title,
):
"""Fuzz lxm.generate_paper_uri WebSocket handler."""
mock_client = MagicMock()
mock_client.send_str = MagicMock(side_effect=lambda d: None)
data = {
"type": "lxm.generate_paper_uri",
"destination_hash": destination_hash,
"content": content,
"title": title,
}
try:
await mock_app.on_websocket_data_received(mock_client, data)
except (KeyError, TypeError, ValueError, AttributeError):
pass
except Exception as e:
# Ignore common hex decoding or identity not found errors
if "hex" not in str(e).lower() and "identity not found" not in str(e).lower():
pytest.fail(f"Paper URI generation crashed: {e}")
# Telemetry Packet Fuzzing (More intensive)
@settings(suppress_health_check=[HealthCheck.function_scoped_fixture], deadline=None)
@given(data=st.binary(min_size=0, max_size=10000))
def test_telemetry_deep_fuzzing(data):
"""Deep fuzz the telemetry unpacking logic with large random payloads."""
from meshchatx.src.backend.telemetry_utils import Telemeter
try:
Telemeter.from_packed(data)
except Exception:
pass
# Markdown Renderer Fuzzing
@settings(suppress_health_check=[HealthCheck.function_scoped_fixture], deadline=None)
@given(text=st.text(min_size=0, max_size=10000))
def test_markdown_renderer_fuzzing(text):
"""Fuzz the markdown to HTML renderer."""
from meshchatx.src.backend.markdown_renderer import MarkdownRenderer
try:
html_out = MarkdownRenderer.render(text)
assert isinstance(html_out, str)
except Exception as e:
pytest.fail(f"MarkdownRenderer crashed: {e}")
@settings(suppress_health_check=[HealthCheck.function_scoped_fixture], deadline=None)
@given(
text=st.one_of(
st.text(min_size=0, max_size=5000),
st.sampled_from(
[
"",
"[x](javascript:alert(1))",
"[x](data:text/html,)",
"**" * 2000,
"#" * 2000,
"`" * 2000,
"[](" * 500 + ")" * 500,
"\x00\x01\x02\n\t",
"\ufffd" * 100,
]
),
),
)
def test_markdown_renderer_dangerous_patterns(text):
"""Fuzz markdown renderer with known risky patterns (XSS, ReDoS, control chars)."""
from meshchatx.src.backend.markdown_renderer import MarkdownRenderer
try:
html_out = MarkdownRenderer.render(text)
assert isinstance(html_out, str)
assert "