import html
import json
import math
import os
import shutil
import pytest
from hypothesis import HealthCheck, given, settings
from hypothesis import strategies as st
from meshchatx.meshchat import ReticulumMeshChat
from meshchatx.src.backend.colour_utils import ColourUtils
from meshchatx.src.backend.identity_manager import IdentityManager
from meshchatx.src.backend.interface_config_parser import InterfaceConfigParser
from meshchatx.src.backend.lxmf_utils import (
convert_db_lxmf_message_to_dict,
convert_lxmf_message_to_dict,
convert_lxmf_method_to_string,
convert_lxmf_state_to_string,
)
from meshchatx.src.backend.markdown_renderer import MarkdownRenderer
from meshchatx.src.backend.meshchat_utils import (
convert_db_favourite_to_dict,
convert_propagation_node_state_to_string,
has_attachments,
message_fields_have_attachments,
parse_bool_query_param,
parse_lxmf_display_name,
parse_lxmf_propagation_node_app_data,
parse_lxmf_stamp_cost,
parse_nomadnetwork_node_display_name,
)
from meshchatx.src.backend.nomadnet_utils import (
convert_nomadnet_field_data_to_map,
convert_nomadnet_string_data_to_map,
)
from meshchatx.src.backend.recovery.crash_recovery import CrashRecovery
from meshchatx.src.backend.telemetry_utils import Telemeter
# Strategies for telemetry data
st_latitude = st.floats(
min_value=-90,
max_value=90,
allow_nan=False,
allow_infinity=False,
)
st_longitude = st.floats(
min_value=-180,
max_value=180,
allow_nan=False,
allow_infinity=False,
)
st_altitude = st.floats(
min_value=-10000,
max_value=100000,
allow_nan=False,
allow_infinity=False,
)
st_speed = st.floats(min_value=0, max_value=1000, allow_nan=False, allow_infinity=False)
st_bearing = st.floats(
min_value=-360,
max_value=360,
allow_nan=False,
allow_infinity=False,
)
st_accuracy = st.floats(
min_value=0,
max_value=655.35,
allow_nan=False,
allow_infinity=False,
)
st_timestamp = st.integers(min_value=0, max_value=2**32 - 1)
@given(
lat=st_latitude,
lon=st_longitude,
alt=st_altitude,
speed=st_speed,
bear=st_bearing,
acc=st_accuracy,
ts=st_timestamp,
)
def test_telemeter_location_roundtrip(lat, lon, alt, speed, bear, acc, ts):
packed = Telemeter.pack_location(lat, lon, alt, speed, bear, acc, ts)
assert packed is not None
unpacked = Telemeter.unpack_location(packed)
assert unpacked is not None
# Check with tolerance due to rounding/fixed point conversion in packing
assert math.isclose(unpacked["latitude"], lat, abs_tol=1e-6)
assert math.isclose(unpacked["longitude"], lon, abs_tol=1e-6)
assert math.isclose(unpacked["altitude"], alt, abs_tol=1e-2)
assert math.isclose(unpacked["speed"], speed, abs_tol=1e-2)
# Bearing can be negative in input but unpacked should match
assert math.isclose(unpacked["bearing"], bear, abs_tol=1e-2)
assert math.isclose(unpacked["accuracy"], acc, abs_tol=1e-2)
assert unpacked["last_update"] == ts
@given(
time_utc=st_timestamp,
lat=st_latitude,
lon=st_longitude,
charge=st.integers(min_value=0, max_value=100),
charging=st.booleans(),
rssi=st.integers(min_value=-150, max_value=0),
snr=st.integers(min_value=-20, max_value=20),
q=st.integers(min_value=0, max_value=100),
)
def test_telemeter_full_pack_roundtrip(
time_utc,
lat,
lon,
charge,
charging,
rssi,
snr,
q,
):
location = {"latitude": lat, "longitude": lon}
battery = {"charge_percent": charge, "charging": charging}
physical_link = {"rssi": rssi, "snr": snr, "q": q}
packed = Telemeter.pack(
time_utc=time_utc,
location=location,
battery=battery,
physical_link=physical_link,
)
unpacked = Telemeter.from_packed(packed)
assert unpacked is not None
assert unpacked["time"]["utc"] == time_utc
assert math.isclose(unpacked["location"]["latitude"], lat, abs_tol=1e-6)
assert math.isclose(unpacked["location"]["longitude"], lon, abs_tol=1e-6)
assert unpacked["battery"]["charge_percent"] == charge
assert unpacked["battery"]["charging"] == charging
assert unpacked["physical_link"]["rssi"] == rssi
assert unpacked["physical_link"]["snr"] == snr
assert unpacked["physical_link"]["q"] == q
@given(hex_val=st.from_regex(r"^#?[0-9a-fA-F]{6}$"))
def test_colour_utils_hex_to_byte_array(hex_val):
result = ColourUtils.hex_colour_to_byte_array(hex_val)
assert len(result) == 3
# Verify manual conversion matches
clean_hex = hex_val.lstrip("#")
expected = bytes.fromhex(clean_hex)
assert result == expected
@given(
val=st.one_of(
st.sampled_from(
["1", "true", "yes", "on", "0", "false", "no", "off", "random"],
),
st.none(),
),
)
def test_parse_bool_query_param(val):
result = parse_bool_query_param(val)
if val is None:
assert result is False
elif val.lower() in {"1", "true", "yes", "on"}:
assert result is True
else:
assert result is False
@given(data=st.binary())
def test_parse_lxmf_display_name_robustness(data):
try:
parse_lxmf_display_name(data)
except Exception as e:
pytest.fail(f"parse_lxmf_display_name crashed: {e}")
@given(data=st.binary())
def test_parse_lxmf_propagation_node_app_data_robustness(data):
try:
result = parse_lxmf_propagation_node_app_data(data)
if result is not None:
assert isinstance(result, dict)
assert "enabled" in result
assert "timebase" in result
assert "per_transfer_limit" in result
except Exception as e:
pytest.fail(f"parse_lxmf_propagation_node_app_data crashed: {e}")
@given(
names=st.lists(
st.text(min_size=1).filter(lambda x: "]" not in x and x.strip()),
min_size=1,
max_size=5,
),
keys=st.lists(
st.text(min_size=1).filter(
lambda x: "=" not in x and "]" not in x and x.strip()
),
min_size=1,
max_size=5,
),
values=st.lists(st.text().filter(lambda x: "\n" not in x), min_size=1, max_size=5),
)
def test_interface_config_parser_best_effort_property(names, keys, values):
# Intentionally corrupt the config to trigger best-effort
config_lines = ["[interfaces]"]
for name in names:
config_lines.append(f"[[{name}") # Missing closing ]]
for k, v in zip(keys, values):
config_lines.append(f" {k} = {v}")
config_text = "\n".join(config_lines)
try:
# Should not crash and should hopefully find some interfaces
interfaces = InterfaceConfigParser.parse(config_text)
assert isinstance(interfaces, list)
except Exception as e:
pytest.fail(f"InterfaceConfigParser.parse best-effort crashed: {e}")
@given(data=st.binary())
def test_parse_lxmf_stamp_cost_robustness(data):
try:
parse_lxmf_stamp_cost(data)
except Exception as e:
pytest.fail(f"parse_lxmf_stamp_cost crashed: {e}")
@given(name=st.text())
def test_parse_nomadnetwork_node_display_name_robustness(name):
try:
parse_nomadnetwork_node_display_name(name)
except Exception as e:
pytest.fail(f"parse_nomadnetwork_node_display_name crashed: {e}")
@given(packed=st.binary())
def test_telemeter_from_packed_robustness(packed):
try:
Telemeter.from_packed(packed)
except Exception as e:
pytest.fail(f"Telemeter.from_packed crashed: {e}")
@given(text=st.text())
def test_markdown_renderer_no_crash(text):
try:
MarkdownRenderer.render(text)
except Exception as e:
pytest.fail(f"MarkdownRenderer.render crashed: {e}")
@given(text=st.text())
def test_interface_config_parser_no_crash(text):
try:
InterfaceConfigParser.parse(text)
except Exception as e:
pytest.fail(f"InterfaceConfigParser.parse crashed: {e}")
@given(
names=st.lists(
st.text(
min_size=1,
alphabet=st.characters(
blacklist_categories=("Cc", "Cs"), blacklist_characters="[]"
),
).filter(lambda x: x.strip() == x and x),
min_size=1,
max_size=5,
unique=True,
),
keys=st.lists(
st.text(
min_size=1,
alphabet=st.characters(
blacklist_categories=("Cc", "Cs"), blacklist_characters="[]="
),
).filter(lambda x: x.strip() == x and x),
min_size=1,
max_size=5,
unique=True,
),
values=st.lists(
st.text(alphabet=st.characters(blacklist_categories=("Cc", "Cs"))).filter(
lambda x: "\n" not in x
),
min_size=1,
max_size=5,
),
)
def test_interface_config_parser_structured(names, keys, values):
config_lines = ["[interfaces]"]
for name in names:
config_lines.append(f"[[{name}]]")
for k, v in zip(keys, values):
config_lines.append(f" {k} = {v}")
config_text = "\n".join(config_lines)
try:
interfaces = InterfaceConfigParser.parse(config_text)
# We check if it parsed successfully and names match.
# Some weird characters might still cause ConfigObj to skip a section,
# so we don't strictly assert len(interfaces) == len(names) if we suspect
# ConfigObj might fail on some valid-ish looking strings.
# But with the filtered alphabet it should be more stable.
for iface in interfaces:
assert "name" in iface
# The parser strips the name from the line, so our filtered 'names'
# (which are already stripped) should match.
assert iface["name"] in names
except Exception as e:
pytest.fail(f"InterfaceConfigParser.parse failed on structured input: {e}")
@given(
interfaces=st.lists(
st.dictionaries(
keys=st.sampled_from(
[
"name",
"type",
"reachable_on",
"target_host",
"remote",
"listen_ip",
"port",
"target_port",
"listen_port",
"discovery_hash",
"transport_id",
"network_id",
]
),
values=st.one_of(st.text(), st.integers(), st.none()),
max_size=12,
),
max_size=40,
),
whitelist=st.one_of(st.text(), st.lists(st.text(), max_size=20), st.none()),
blacklist=st.one_of(st.text(), st.lists(st.text(), max_size=20), st.none()),
)
def test_discovery_filter_robustness(interfaces, whitelist, blacklist):
try:
filtered = ReticulumMeshChat.filter_discovered_interfaces(
interfaces,
whitelist,
blacklist,
)
except Exception as e:
pytest.fail(f"Discovery filtering crashed: {e}")
assert isinstance(filtered, list)
assert len(filtered) <= len(interfaces)
# Strategy for a database message row
st_db_message = st.dictionaries(
keys=st.sampled_from(
[
"id",
"hash",
"source_hash",
"destination_hash",
"is_incoming",
"state",
"progress",
"method",
"delivery_attempts",
"next_delivery_attempt_at",
"title",
"content",
"fields",
"timestamp",
"rssi",
"snr",
"quality",
"is_spam",
"created_at",
"updated_at",
],
),
values=st.one_of(
st.none(),
st.integers(),
st.floats(allow_nan=False, allow_infinity=False),
st.text(),
st.booleans(),
st.binary().map(lambda b: b.hex()),
),
).filter(lambda d: "created_at" in d and "updated_at" in d)
@settings(suppress_health_check=[HealthCheck.too_slow])
@given(db_message=st_db_message)
def test_convert_db_lxmf_message_to_dict_robustness(db_message):
# Fill in missing required keys for the function
required_keys = [
"id",
"hash",
"source_hash",
"destination_hash",
"is_incoming",
"state",
"progress",
"method",
"delivery_attempts",
"next_delivery_attempt_at",
"title",
"content",
"fields",
"timestamp",
"rssi",
"snr",
"quality",
"is_spam",
"created_at",
"updated_at",
]
for key in required_keys:
if key not in db_message:
db_message[key] = None
# Ensure fields is a valid JSON string if it's not None
if db_message["fields"] is not None:
try:
json.loads(db_message["fields"])
except (ValueError, TypeError, json.JSONDecodeError):
db_message["fields"] = "{}"
try:
convert_db_lxmf_message_to_dict(db_message)
except Exception:
# We expect some errors if data is really weird, but it shouldn't crash the whole thing
pass
@given(data=st.dictionaries(keys=st.text(), values=st.text()))
def test_convert_nomadnet_field_data_to_map(data):
result = convert_nomadnet_field_data_to_map(data)
assert len(result) == len(data)
for k, v in data.items():
assert result[f"field_{k}"] == v
@given(
data=st.dictionaries(
keys=st.text().filter(lambda x: "=" not in x and "|" not in x and x),
values=st.text().filter(lambda x: "|" not in x),
),
)
def test_convert_nomadnet_string_data_to_map_roundtrip(data):
# Construct string like key1=val1|key2=val2
input_str = "|".join([f"{k}={v}" for k, v in data.items()])
result = convert_nomadnet_string_data_to_map(input_str)
assert len(result) == len(data)
for k, v in data.items():
assert result[f"var_{k}"] == v
@given(text=st.text())
def test_markdown_renderer_xss_protection(text):
# Basic check: if we use {text}"
result = MarkdownRenderer.render(input_text)
assert "