# SPDX-License-Identifier: 0BSD import html import json import math import os import shutil import pytest from hypothesis import HealthCheck, given, settings from hypothesis import strategies as st from meshchatx.meshchat import ReticulumMeshChat from meshchatx.src.backend.colour_utils import ColourUtils from meshchatx.src.backend.identity_manager import IdentityManager from meshchatx.src.backend.interface_config_parser import InterfaceConfigParser from meshchatx.src.backend.lxmf_utils import ( convert_db_lxmf_message_to_dict, convert_lxmf_message_to_dict, convert_lxmf_method_to_string, convert_lxmf_state_to_string, ) from meshchatx.src.backend.markdown_renderer import MarkdownRenderer from meshchatx.src.backend.meshchat_utils import ( convert_db_favourite_to_dict, convert_propagation_node_state_to_string, has_attachments, message_fields_have_attachments, parse_bool_query_param, parse_lxmf_display_name, parse_lxmf_propagation_node_app_data, parse_lxmf_stamp_cost, parse_nomadnetwork_node_display_name, ) from meshchatx.src.backend.nomadnet_utils import ( convert_nomadnet_field_data_to_map, convert_nomadnet_string_data_to_map, ) from meshchatx.src.backend.recovery.crash_recovery import CrashRecovery from meshchatx.src.backend.telemetry_utils import Telemeter # Strategies for telemetry data st_latitude = st.floats( min_value=-90, max_value=90, allow_nan=False, allow_infinity=False, ) st_longitude = st.floats( min_value=-180, max_value=180, allow_nan=False, allow_infinity=False, ) st_altitude = st.floats( min_value=-10000, max_value=100000, allow_nan=False, allow_infinity=False, ) st_speed = st.floats(min_value=0, max_value=1000, allow_nan=False, allow_infinity=False) st_bearing = st.floats( min_value=-360, max_value=360, allow_nan=False, allow_infinity=False, ) st_accuracy = st.floats( min_value=0, max_value=655.35, allow_nan=False, allow_infinity=False, ) st_timestamp = st.integers(min_value=0, max_value=2**32 - 1) @given( lat=st_latitude, lon=st_longitude, alt=st_altitude, speed=st_speed, bear=st_bearing, acc=st_accuracy, ts=st_timestamp, ) def test_telemeter_location_roundtrip(lat, lon, alt, speed, bear, acc, ts): packed = Telemeter.pack_location(lat, lon, alt, speed, bear, acc, ts) assert packed is not None unpacked = Telemeter.unpack_location(packed) assert unpacked is not None # Check with tolerance due to rounding/fixed point conversion in packing assert math.isclose(unpacked["latitude"], lat, abs_tol=1e-6) assert math.isclose(unpacked["longitude"], lon, abs_tol=1e-6) assert math.isclose(unpacked["altitude"], alt, abs_tol=1e-2) assert math.isclose(unpacked["speed"], speed, abs_tol=1e-2) # Bearing can be negative in input but unpacked should match assert math.isclose(unpacked["bearing"], bear, abs_tol=1e-2) assert math.isclose(unpacked["accuracy"], acc, abs_tol=1e-2) assert unpacked["last_update"] == ts @given( time_utc=st_timestamp, lat=st_latitude, lon=st_longitude, charge=st.integers(min_value=0, max_value=100), charging=st.booleans(), rssi=st.integers(min_value=-150, max_value=0), snr=st.integers(min_value=-20, max_value=20), q=st.integers(min_value=0, max_value=100), ) def test_telemeter_full_pack_roundtrip( time_utc, lat, lon, charge, charging, rssi, snr, q, ): location = {"latitude": lat, "longitude": lon} battery = {"charge_percent": charge, "charging": charging} physical_link = {"rssi": rssi, "snr": snr, "q": q} packed = Telemeter.pack( time_utc=time_utc, location=location, battery=battery, physical_link=physical_link, ) unpacked = Telemeter.from_packed(packed) assert unpacked is not None assert unpacked["time"]["utc"] == time_utc assert math.isclose(unpacked["location"]["latitude"], lat, abs_tol=1e-6) assert math.isclose(unpacked["location"]["longitude"], lon, abs_tol=1e-6) assert unpacked["battery"]["charge_percent"] == charge assert unpacked["battery"]["charging"] == charging assert unpacked["physical_link"]["rssi"] == rssi assert unpacked["physical_link"]["snr"] == snr assert unpacked["physical_link"]["q"] == q @given(hex_val=st.from_regex(r"^#?[0-9a-fA-F]{6}$")) def test_colour_utils_hex_to_byte_array(hex_val): result = ColourUtils.hex_colour_to_byte_array(hex_val) assert len(result) == 3 # Verify manual conversion matches clean_hex = hex_val.lstrip("#") expected = bytes.fromhex(clean_hex) assert result == expected @given( val=st.one_of( st.sampled_from( ["1", "true", "yes", "on", "0", "false", "no", "off", "random"], ), st.none(), ), ) def test_parse_bool_query_param(val): result = parse_bool_query_param(val) if val is None: assert result is False elif val.lower() in {"1", "true", "yes", "on"}: assert result is True else: assert result is False @given(data=st.binary()) def test_parse_lxmf_display_name_robustness(data): try: parse_lxmf_display_name(data) except Exception as e: pytest.fail(f"parse_lxmf_display_name crashed: {e}") @given(data=st.binary()) def test_parse_lxmf_propagation_node_app_data_robustness(data): try: result = parse_lxmf_propagation_node_app_data(data) if result is not None: assert isinstance(result, dict) assert "enabled" in result assert "timebase" in result assert "per_transfer_limit" in result except Exception as e: pytest.fail(f"parse_lxmf_propagation_node_app_data crashed: {e}") @given( names=st.lists( st.text(min_size=1).filter(lambda x: "]" not in x and x.strip()), min_size=1, max_size=5, ), keys=st.lists( st.text(min_size=1).filter( lambda x: "=" not in x and "]" not in x and x.strip(), ), min_size=1, max_size=5, ), values=st.lists(st.text().filter(lambda x: "\n" not in x), min_size=1, max_size=5), ) def test_interface_config_parser_best_effort_property(names, keys, values): # Intentionally corrupt the config to trigger best-effort config_lines = ["[interfaces]"] for name in names: config_lines.append(f"[[{name}") # Missing closing ]] for k, v in zip(keys, values, strict=False): config_lines.append(f" {k} = {v}") config_text = "\n".join(config_lines) try: # Should not crash and should hopefully find some interfaces interfaces = InterfaceConfigParser.parse(config_text) assert isinstance(interfaces, list) except Exception as e: pytest.fail(f"InterfaceConfigParser.parse best-effort crashed: {e}") @given(data=st.binary()) def test_parse_lxmf_stamp_cost_robustness(data): try: parse_lxmf_stamp_cost(data) except Exception as e: pytest.fail(f"parse_lxmf_stamp_cost crashed: {e}") @given(name=st.text()) def test_parse_nomadnetwork_node_display_name_robustness(name): try: parse_nomadnetwork_node_display_name(name) except Exception as e: pytest.fail(f"parse_nomadnetwork_node_display_name crashed: {e}") @given(packed=st.binary()) def test_telemeter_from_packed_robustness(packed): try: Telemeter.from_packed(packed) except Exception as e: pytest.fail(f"Telemeter.from_packed crashed: {e}") @given(text=st.text()) def test_markdown_renderer_no_crash(text): try: MarkdownRenderer.render(text) except Exception as e: pytest.fail(f"MarkdownRenderer.render crashed: {e}") @given(text=st.text()) def test_interface_config_parser_no_crash(text): try: InterfaceConfigParser.parse(text) except Exception as e: pytest.fail(f"InterfaceConfigParser.parse crashed: {e}") @given( names=st.lists( st.text( min_size=1, alphabet=st.characters( blacklist_categories=("Cc", "Cs"), blacklist_characters="[]", ), ).filter(lambda x: x.strip() == x and x), min_size=1, max_size=5, unique=True, ), keys=st.lists( st.text( min_size=1, alphabet=st.characters( blacklist_categories=("Cc", "Cs"), blacklist_characters="[]=", ), ).filter(lambda x: x.strip() == x and x), min_size=1, max_size=5, unique=True, ), values=st.lists( st.text(alphabet=st.characters(blacklist_categories=("Cc", "Cs"))).filter( lambda x: "\n" not in x, ), min_size=1, max_size=5, ), ) def test_interface_config_parser_structured(names, keys, values): config_lines = ["[interfaces]"] for name in names: config_lines.append(f"[[{name}]]") for k, v in zip(keys, values, strict=False): config_lines.append(f" {k} = {v}") config_text = "\n".join(config_lines) try: interfaces = InterfaceConfigParser.parse(config_text) # We check if it parsed successfully and names match. # Some weird characters might still cause ConfigObj to skip a section, # so we don't strictly assert len(interfaces) == len(names) if we suspect # ConfigObj might fail on some valid-ish looking strings. # But with the filtered alphabet it should be more stable. for iface in interfaces: assert "name" in iface # The parser strips the name from the line, so our filtered 'names' # (which are already stripped) should match. assert iface["name"] in names except Exception as e: pytest.fail(f"InterfaceConfigParser.parse failed on structured input: {e}") @given( interfaces=st.lists( st.dictionaries( keys=st.sampled_from( [ "name", "type", "reachable_on", "target_host", "remote", "listen_ip", "port", "target_port", "listen_port", "discovery_hash", "transport_id", "network_id", ], ), values=st.one_of(st.text(), st.integers(), st.none()), max_size=12, ), max_size=40, ), whitelist=st.one_of(st.text(), st.lists(st.text(), max_size=20), st.none()), blacklist=st.one_of(st.text(), st.lists(st.text(), max_size=20), st.none()), ) def test_discovery_filter_robustness(interfaces, whitelist, blacklist): try: filtered = ReticulumMeshChat.filter_discovered_interfaces( interfaces, whitelist, blacklist, ) except Exception as e: pytest.fail(f"Discovery filtering crashed: {e}") assert isinstance(filtered, list) assert len(filtered) <= len(interfaces) # Strategy for a database message row st_db_message = st.dictionaries( keys=st.sampled_from( [ "id", "hash", "source_hash", "destination_hash", "is_incoming", "state", "progress", "method", "delivery_attempts", "next_delivery_attempt_at", "title", "content", "fields", "timestamp", "rssi", "snr", "quality", "is_spam", "created_at", "updated_at", ], ), values=st.one_of( st.none(), st.integers(), st.floats(allow_nan=False, allow_infinity=False), st.text(), st.booleans(), st.binary().map(lambda b: b.hex()), ), ).filter(lambda d: "created_at" in d and "updated_at" in d) @settings( suppress_health_check=[ HealthCheck.too_slow, HealthCheck.filter_too_much, ], ) @given(db_message=st_db_message) def test_convert_db_lxmf_message_to_dict_robustness(db_message): # Fill in missing required keys for the function required_keys = [ "id", "hash", "source_hash", "destination_hash", "is_incoming", "state", "progress", "method", "delivery_attempts", "next_delivery_attempt_at", "title", "content", "fields", "timestamp", "rssi", "snr", "quality", "is_spam", "created_at", "updated_at", ] for key in required_keys: if key not in db_message: db_message[key] = None # Ensure fields is a valid JSON string if it's not None if db_message["fields"] is not None: try: json.loads(db_message["fields"]) except (ValueError, TypeError, json.JSONDecodeError): db_message["fields"] = "{}" try: convert_db_lxmf_message_to_dict(db_message) except Exception: # We expect some errors if data is really weird, but it shouldn't crash the whole thing pass @given(data=st.dictionaries(keys=st.text(), values=st.text())) def test_convert_nomadnet_field_data_to_map(data): result = convert_nomadnet_field_data_to_map(data) assert len(result) == len(data) for k, v in data.items(): assert result[f"field_{k}"] == v @given( data=st.dictionaries( keys=st.text().filter(lambda x: "=" not in x and "|" not in x and x), values=st.text().filter(lambda x: "|" not in x), ), ) def test_convert_nomadnet_string_data_to_map_roundtrip(data): # Construct string like key1=val1|key2=val2 input_str = "|".join([f"{k}={v}" for k, v in data.items()]) result = convert_nomadnet_string_data_to_map(input_str) assert len(result) == len(data) for k, v in data.items(): assert result[f"var_{k}"] == v @given(text=st.text()) def test_markdown_renderer_xss_protection(text): # Basic check: if we use {text}" result = MarkdownRenderer.render(input_text) assert "