# SPDX-License-Identifier: 0BSD
import html
import json
import math
import os
import shutil
import pytest
from hypothesis import HealthCheck, given, settings
from hypothesis import strategies as st
from meshchatx.meshchat import ReticulumMeshChat
from meshchatx.src.backend.colour_utils import ColourUtils
from meshchatx.src.backend.identity_manager import IdentityManager
from meshchatx.src.backend.interface_config_parser import InterfaceConfigParser
from meshchatx.src.backend.lxmf_utils import (
convert_db_lxmf_message_to_dict,
convert_lxmf_message_to_dict,
convert_lxmf_method_to_string,
convert_lxmf_state_to_string,
)
from meshchatx.src.backend.markdown_renderer import MarkdownRenderer
from meshchatx.src.backend.meshchat_utils import (
convert_db_favourite_to_dict,
convert_propagation_node_state_to_string,
has_attachments,
message_fields_have_attachments,
parse_bool_query_param,
parse_lxmf_display_name,
parse_lxmf_propagation_node_app_data,
parse_lxmf_stamp_cost,
parse_nomadnetwork_node_display_name,
)
from meshchatx.src.backend.nomadnet_utils import (
convert_nomadnet_field_data_to_map,
convert_nomadnet_string_data_to_map,
)
from meshchatx.src.backend.recovery.crash_recovery import CrashRecovery
from meshchatx.src.backend.telemetry_utils import Telemeter
# Strategies for telemetry data
st_latitude = st.floats(
min_value=-90,
max_value=90,
allow_nan=False,
allow_infinity=False,
)
st_longitude = st.floats(
min_value=-180,
max_value=180,
allow_nan=False,
allow_infinity=False,
)
st_altitude = st.floats(
min_value=-10000,
max_value=100000,
allow_nan=False,
allow_infinity=False,
)
st_speed = st.floats(min_value=0, max_value=1000, allow_nan=False, allow_infinity=False)
st_bearing = st.floats(
min_value=-360,
max_value=360,
allow_nan=False,
allow_infinity=False,
)
st_accuracy = st.floats(
min_value=0,
max_value=655.35,
allow_nan=False,
allow_infinity=False,
)
st_timestamp = st.integers(min_value=0, max_value=2**32 - 1)
@given(
lat=st_latitude,
lon=st_longitude,
alt=st_altitude,
speed=st_speed,
bear=st_bearing,
acc=st_accuracy,
ts=st_timestamp,
)
def test_telemeter_location_roundtrip(lat, lon, alt, speed, bear, acc, ts):
packed = Telemeter.pack_location(lat, lon, alt, speed, bear, acc, ts)
assert packed is not None
unpacked = Telemeter.unpack_location(packed)
assert unpacked is not None
# Check with tolerance due to rounding/fixed point conversion in packing
assert math.isclose(unpacked["latitude"], lat, abs_tol=1e-6)
assert math.isclose(unpacked["longitude"], lon, abs_tol=1e-6)
assert math.isclose(unpacked["altitude"], alt, abs_tol=1e-2)
assert math.isclose(unpacked["speed"], speed, abs_tol=1e-2)
# Bearing can be negative in input but unpacked should match
assert math.isclose(unpacked["bearing"], bear, abs_tol=1e-2)
assert math.isclose(unpacked["accuracy"], acc, abs_tol=1e-2)
assert unpacked["last_update"] == ts
@given(
time_utc=st_timestamp,
lat=st_latitude,
lon=st_longitude,
charge=st.integers(min_value=0, max_value=100),
charging=st.booleans(),
rssi=st.integers(min_value=-150, max_value=0),
snr=st.integers(min_value=-20, max_value=20),
q=st.integers(min_value=0, max_value=100),
)
def test_telemeter_full_pack_roundtrip(
time_utc,
lat,
lon,
charge,
charging,
rssi,
snr,
q,
):
location = {"latitude": lat, "longitude": lon}
battery = {"charge_percent": charge, "charging": charging}
physical_link = {"rssi": rssi, "snr": snr, "q": q}
packed = Telemeter.pack(
time_utc=time_utc,
location=location,
battery=battery,
physical_link=physical_link,
)
unpacked = Telemeter.from_packed(packed)
assert unpacked is not None
assert unpacked["time"]["utc"] == time_utc
assert math.isclose(unpacked["location"]["latitude"], lat, abs_tol=1e-6)
assert math.isclose(unpacked["location"]["longitude"], lon, abs_tol=1e-6)
assert unpacked["battery"]["charge_percent"] == charge
assert unpacked["battery"]["charging"] == charging
assert unpacked["physical_link"]["rssi"] == rssi
assert unpacked["physical_link"]["snr"] == snr
assert unpacked["physical_link"]["q"] == q
@given(hex_val=st.from_regex(r"^#?[0-9a-fA-F]{6}$"))
def test_colour_utils_hex_to_byte_array(hex_val):
result = ColourUtils.hex_colour_to_byte_array(hex_val)
assert len(result) == 3
# Verify manual conversion matches
clean_hex = hex_val.lstrip("#")
expected = bytes.fromhex(clean_hex)
assert result == expected
@given(
val=st.one_of(
st.sampled_from(
["1", "true", "yes", "on", "0", "false", "no", "off", "random"],
),
st.none(),
),
)
def test_parse_bool_query_param(val):
result = parse_bool_query_param(val)
if val is None:
assert result is False
elif val.lower() in {"1", "true", "yes", "on"}:
assert result is True
else:
assert result is False
@given(data=st.binary())
def test_parse_lxmf_display_name_robustness(data):
try:
parse_lxmf_display_name(data)
except Exception as e:
pytest.fail(f"parse_lxmf_display_name crashed: {e}")
@given(data=st.binary())
def test_parse_lxmf_propagation_node_app_data_robustness(data):
try:
result = parse_lxmf_propagation_node_app_data(data)
if result is not None:
assert isinstance(result, dict)
assert "enabled" in result
assert "timebase" in result
assert "per_transfer_limit" in result
except Exception as e:
pytest.fail(f"parse_lxmf_propagation_node_app_data crashed: {e}")
@given(
names=st.lists(
st.text(min_size=1).filter(lambda x: "]" not in x and x.strip()),
min_size=1,
max_size=5,
),
keys=st.lists(
st.text(min_size=1).filter(
lambda x: "=" not in x and "]" not in x and x.strip(),
),
min_size=1,
max_size=5,
),
values=st.lists(st.text().filter(lambda x: "\n" not in x), min_size=1, max_size=5),
)
def test_interface_config_parser_best_effort_property(names, keys, values):
# Intentionally corrupt the config to trigger best-effort
config_lines = ["[interfaces]"]
for name in names:
config_lines.append(f"[[{name}") # Missing closing ]]
for k, v in zip(keys, values, strict=False):
config_lines.append(f" {k} = {v}")
config_text = "\n".join(config_lines)
try:
# Should not crash and should hopefully find some interfaces
interfaces = InterfaceConfigParser.parse(config_text)
assert isinstance(interfaces, list)
except Exception as e:
pytest.fail(f"InterfaceConfigParser.parse best-effort crashed: {e}")
@given(data=st.binary())
def test_parse_lxmf_stamp_cost_robustness(data):
try:
parse_lxmf_stamp_cost(data)
except Exception as e:
pytest.fail(f"parse_lxmf_stamp_cost crashed: {e}")
@given(name=st.text())
def test_parse_nomadnetwork_node_display_name_robustness(name):
try:
parse_nomadnetwork_node_display_name(name)
except Exception as e:
pytest.fail(f"parse_nomadnetwork_node_display_name crashed: {e}")
@given(packed=st.binary())
def test_telemeter_from_packed_robustness(packed):
try:
Telemeter.from_packed(packed)
except Exception as e:
pytest.fail(f"Telemeter.from_packed crashed: {e}")
@given(text=st.text())
def test_markdown_renderer_no_crash(text):
try:
MarkdownRenderer.render(text)
except Exception as e:
pytest.fail(f"MarkdownRenderer.render crashed: {e}")
@given(text=st.text())
def test_interface_config_parser_no_crash(text):
try:
InterfaceConfigParser.parse(text)
except Exception as e:
pytest.fail(f"InterfaceConfigParser.parse crashed: {e}")
@given(
names=st.lists(
st.text(
min_size=1,
alphabet=st.characters(
blacklist_categories=("Cc", "Cs"),
blacklist_characters="[]",
),
).filter(lambda x: x.strip() == x and x),
min_size=1,
max_size=5,
unique=True,
),
keys=st.lists(
st.text(
min_size=1,
alphabet=st.characters(
blacklist_categories=("Cc", "Cs"),
blacklist_characters="[]=",
),
).filter(lambda x: x.strip() == x and x),
min_size=1,
max_size=5,
unique=True,
),
values=st.lists(
st.text(alphabet=st.characters(blacklist_categories=("Cc", "Cs"))).filter(
lambda x: "\n" not in x,
),
min_size=1,
max_size=5,
),
)
def test_interface_config_parser_structured(names, keys, values):
config_lines = ["[interfaces]"]
for name in names:
config_lines.append(f"[[{name}]]")
for k, v in zip(keys, values, strict=False):
config_lines.append(f" {k} = {v}")
config_text = "\n".join(config_lines)
try:
interfaces = InterfaceConfigParser.parse(config_text)
# We check if it parsed successfully and names match.
# Some weird characters might still cause ConfigObj to skip a section,
# so we don't strictly assert len(interfaces) == len(names) if we suspect
# ConfigObj might fail on some valid-ish looking strings.
# But with the filtered alphabet it should be more stable.
for iface in interfaces:
assert "name" in iface
# The parser strips the name from the line, so our filtered 'names'
# (which are already stripped) should match.
assert iface["name"] in names
except Exception as e:
pytest.fail(f"InterfaceConfigParser.parse failed on structured input: {e}")
@given(
interfaces=st.lists(
st.dictionaries(
keys=st.sampled_from(
[
"name",
"type",
"reachable_on",
"target_host",
"remote",
"listen_ip",
"port",
"target_port",
"listen_port",
"discovery_hash",
"transport_id",
"network_id",
],
),
values=st.one_of(st.text(), st.integers(), st.none()),
max_size=12,
),
max_size=40,
),
whitelist=st.one_of(st.text(), st.lists(st.text(), max_size=20), st.none()),
blacklist=st.one_of(st.text(), st.lists(st.text(), max_size=20), st.none()),
)
def test_discovery_filter_robustness(interfaces, whitelist, blacklist):
try:
filtered = ReticulumMeshChat.filter_discovered_interfaces(
interfaces,
whitelist,
blacklist,
)
except Exception as e:
pytest.fail(f"Discovery filtering crashed: {e}")
assert isinstance(filtered, list)
assert len(filtered) <= len(interfaces)
# Strategy for a database message row
st_db_message = st.dictionaries(
keys=st.sampled_from(
[
"id",
"hash",
"source_hash",
"destination_hash",
"is_incoming",
"state",
"progress",
"method",
"delivery_attempts",
"next_delivery_attempt_at",
"title",
"content",
"fields",
"timestamp",
"rssi",
"snr",
"quality",
"is_spam",
"created_at",
"updated_at",
],
),
values=st.one_of(
st.none(),
st.integers(),
st.floats(allow_nan=False, allow_infinity=False),
st.text(),
st.booleans(),
st.binary().map(lambda b: b.hex()),
),
).filter(lambda d: "created_at" in d and "updated_at" in d)
@settings(
suppress_health_check=[
HealthCheck.too_slow,
HealthCheck.filter_too_much,
],
)
@given(db_message=st_db_message)
def test_convert_db_lxmf_message_to_dict_robustness(db_message):
# Fill in missing required keys for the function
required_keys = [
"id",
"hash",
"source_hash",
"destination_hash",
"is_incoming",
"state",
"progress",
"method",
"delivery_attempts",
"next_delivery_attempt_at",
"title",
"content",
"fields",
"timestamp",
"rssi",
"snr",
"quality",
"is_spam",
"created_at",
"updated_at",
]
for key in required_keys:
if key not in db_message:
db_message[key] = None
# Ensure fields is a valid JSON string if it's not None
if db_message["fields"] is not None:
try:
json.loads(db_message["fields"])
except (ValueError, TypeError, json.JSONDecodeError):
db_message["fields"] = "{}"
try:
convert_db_lxmf_message_to_dict(db_message)
except Exception:
# We expect some errors if data is really weird, but it shouldn't crash the whole thing
pass
@given(data=st.dictionaries(keys=st.text(), values=st.text()))
def test_convert_nomadnet_field_data_to_map(data):
result = convert_nomadnet_field_data_to_map(data)
assert len(result) == len(data)
for k, v in data.items():
assert result[f"field_{k}"] == v
@given(
data=st.dictionaries(
keys=st.text().filter(lambda x: "=" not in x and "|" not in x and x),
values=st.text().filter(lambda x: "|" not in x),
),
)
def test_convert_nomadnet_string_data_to_map_roundtrip(data):
# Construct string like key1=val1|key2=val2
input_str = "|".join([f"{k}={v}" for k, v in data.items()])
result = convert_nomadnet_string_data_to_map(input_str)
assert len(result) == len(data)
for k, v in data.items():
assert result[f"var_{k}"] == v
@given(text=st.text())
def test_markdown_renderer_xss_protection(text):
# Basic check: if we use {text}"
result = MarkdownRenderer.render(input_text)
assert "