Files
MeshChatX/tests/backend/test_lxmf_communication.py

1668 lines
66 KiB
Python

# SPDX-License-Identifier: 0BSD
"""Integration tests for LXMF messaging and Reticulum communication.
Covers stamp proof-of-work, message packing/unpacking, signature validation,
delivery pipelines (direct, opportunistic, propagated), propagation node
operations, stamp enforcement, deduplication, and failure handling.
Stamp tests run in-process (no Reticulum required, fast).
Protocol and delivery tests run in isolated subprocesses to respect the
Reticulum singleton constraint.
Enable subprocess tests: MESHCHAT_LIVE_RETICULUM=1
"""
import json
import os
import subprocess
import sys
import textwrap
import pytest
import RNS
from LXMF import LXStamper
_RUN = os.environ.get("MESHCHAT_LIVE_RETICULUM") == "1"
_MINIMAL_RNS_CONFIG = """\
[reticulum]
enable_transport = False
share_instance = No
panic_on_interface_error = No
[interfaces]
"""
def _run_lxmf_script(script_body, timeout=120):
return subprocess.run(
[sys.executable, "-c", script_body],
capture_output=True,
text=True,
timeout=timeout,
check=False,
)
def _parse_result(proc):
assert proc.returncode == 0, f"Script failed:\n{proc.stderr}\n{proc.stdout}"
lines = proc.stdout.strip().splitlines()
for line in reversed(lines):
line = line.strip()
if line.startswith("{"):
return json.loads(line)
raise ValueError(f"No JSON in output:\n{proc.stdout}")
# Shared preamble injected into subprocess scripts.
_SUBPROCESS_PREAMBLE = textwrap.dedent(f"""\
import tempfile, os, json, time, threading, shutil
import RNS, LXMF
import LXMF.LXStamper as LXStamper
_tmpdir = tempfile.mkdtemp(prefix="meshchat_lxmf_test_")
_config_path = os.path.join(_tmpdir, "config")
with open(_config_path, "w") as f:
f.write({_MINIMAL_RNS_CONFIG!r})
_reticulum = RNS.Reticulum(configdir=_tmpdir, loglevel=RNS.LOG_NONE)
def _emit(data):
import sys as _sys
print(json.dumps(data), flush=True)
_sys.stdout.flush()
def _cleanup():
try:
RNS.Reticulum.exit_handler()
except Exception:
pass
shutil.rmtree(_tmpdir, ignore_errors=True)
""")
# ────────────────────────────────────────────────────────────────
# 1. Stamp proof-of-work (in-process, no Reticulum needed)
# ────────────────────────────────────────────────────────────────
class TestStampSolving:
def test_workblock_deterministic(self):
material = os.urandom(32)
wb1 = LXStamper.stamp_workblock(material, expand_rounds=10)
wb2 = LXStamper.stamp_workblock(material, expand_rounds=10)
assert wb1 == wb2
def test_workblock_unique_per_material(self):
wb1 = LXStamper.stamp_workblock(b"alpha", expand_rounds=10)
wb2 = LXStamper.stamp_workblock(b"bravo", expand_rounds=10)
assert wb1 != wb2
def test_generate_and_validate_cost_1(self):
mid = os.urandom(32)
stamp, value = LXStamper.generate_stamp(mid, stamp_cost=1)
assert stamp is not None
assert value >= 1
wb = LXStamper.stamp_workblock(mid)
assert LXStamper.stamp_valid(stamp, 1, wb)
def test_generate_and_validate_cost_4(self):
mid = os.urandom(32)
stamp, value = LXStamper.generate_stamp(mid, stamp_cost=4)
assert stamp is not None
assert value >= 4
wb = LXStamper.stamp_workblock(mid)
assert LXStamper.stamp_valid(stamp, 4, wb)
def test_stamp_value_equals_reported(self):
mid = os.urandom(32)
stamp, value = LXStamper.generate_stamp(mid, stamp_cost=2)
wb = LXStamper.stamp_workblock(mid)
assert LXStamper.stamp_value(wb, stamp) == value
def test_random_stamp_rejected_at_high_cost(self):
mid = os.urandom(32)
bad = os.urandom(32)
wb = LXStamper.stamp_workblock(mid)
assert not LXStamper.stamp_valid(bad, 32, wb)
def test_stamp_invalid_for_different_message(self):
mid_a = os.urandom(32)
stamp, _ = LXStamper.generate_stamp(mid_a, stamp_cost=4)
wb_a = LXStamper.stamp_workblock(mid_a)
assert LXStamper.stamp_valid(stamp, 4, wb_a)
for _ in range(512):
mid_b = os.urandom(32)
wb_b = LXStamper.stamp_workblock(mid_b)
if not LXStamper.stamp_valid(stamp, 4, wb_b):
return
pytest.fail("stamp unexpectedly validated against many random workblocks")
def test_propagation_node_stamp_rounds(self):
mid = os.urandom(32)
stamp, value = LXStamper.generate_stamp(
mid,
stamp_cost=2,
expand_rounds=LXStamper.WORKBLOCK_EXPAND_ROUNDS_PN,
)
assert stamp is not None
wb = LXStamper.stamp_workblock(
mid,
expand_rounds=LXStamper.WORKBLOCK_EXPAND_ROUNDS_PN,
)
assert LXStamper.stamp_valid(stamp, 2, wb)
def test_peering_key_generation_and_validation(self):
peer_id = os.urandom(32)
stamp, _ = LXStamper.generate_stamp(
peer_id,
stamp_cost=2,
expand_rounds=LXStamper.WORKBLOCK_EXPAND_ROUNDS_PEERING,
)
assert LXStamper.validate_peering_key(peer_id, stamp, 2)
def test_peering_key_rejected_for_wrong_id(self):
"""Ensure wrong peer id fails peering validation.
Pick a peer_b for which the stamp does not accidentally satisfy the
target cost. With cost=8 there is a 1/256 chance of a random workblock
producing a hash that already meets the threshold, so retry until we
get a peer_b that genuinely fails validation. This avoids CI flakes
without inflating stamp_cost (and therefore generation time).
"""
peer_a = os.urandom(32)
stamp, _ = LXStamper.generate_stamp(
peer_a,
stamp_cost=8,
expand_rounds=LXStamper.WORKBLOCK_EXPAND_ROUNDS_PEERING,
)
for _ in range(64):
peer_b = os.urandom(32)
if peer_b == peer_a:
continue
if not LXStamper.validate_peering_key(peer_b, stamp, 8):
return
pytest.fail("could not find a peer_b that rejects peer_a's stamp")
def test_pn_stamp_valid_transient_data(self):
from LXMF.LXMessage import LXMessage
overhead = LXMessage.LXMF_OVERHEAD + LXStamper.STAMP_SIZE
fake_lxm = os.urandom(overhead + 64)
t_id = RNS.Identity.full_hash(fake_lxm)
stamp, _ = LXStamper.generate_stamp(
t_id,
stamp_cost=2,
expand_rounds=LXStamper.WORKBLOCK_EXPAND_ROUNDS_PN,
)
td = fake_lxm + stamp
r_id, r_data, r_val, r_stamp = LXStamper.validate_pn_stamp(td, 2)
assert r_id is not None
assert r_data == fake_lxm
assert r_val >= 2
assert r_stamp == stamp
def test_pn_stamp_rejected_bad_stamp(self):
from LXMF.LXMessage import LXMessage
overhead = LXMessage.LXMF_OVERHEAD + LXStamper.STAMP_SIZE
fake_lxm = os.urandom(overhead + 64)
bad_stamp = os.urandom(LXStamper.STAMP_SIZE)
td = fake_lxm + bad_stamp
r_id, _, _, _ = LXStamper.validate_pn_stamp(td, 16)
assert r_id is None
def test_pn_stamp_batch_validation(self):
from LXMF.LXMessage import LXMessage
overhead = LXMessage.LXMF_OVERHEAD + LXStamper.STAMP_SIZE
items = []
for _ in range(5):
fake = os.urandom(overhead + 64)
t_id = RNS.Identity.full_hash(fake)
stamp, _ = LXStamper.generate_stamp(
t_id,
stamp_cost=2,
expand_rounds=LXStamper.WORKBLOCK_EXPAND_ROUNDS_PN,
)
items.append(fake + stamp)
results = LXStamper.validate_pn_stamps(items, 2)
assert len(results) == 5
def test_stamp_cost_boundary_zero(self):
mid = os.urandom(32)
wb = LXStamper.stamp_workblock(mid)
any_stamp = os.urandom(32)
assert LXStamper.stamp_valid(any_stamp, 0, wb)
def test_workblock_expand_rounds_affect_output(self):
material = os.urandom(32)
wb_10 = LXStamper.stamp_workblock(material, expand_rounds=10)
wb_20 = LXStamper.stamp_workblock(material, expand_rounds=20)
assert wb_10 != wb_20
assert len(wb_20) == 2 * len(wb_10)
# ────────────────────────────────────────────────────────────────
# 2. LXMessage protocol (subprocess, real crypto)
# ────────────────────────────────────────────────────────────────
class TestLXMessageProtocol:
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_pack_unpack_roundtrip(self):
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
try:
sender_id = RNS.Identity()
receiver_id = RNS.Identity()
sender_dest = RNS.Destination(
sender_id, RNS.Destination.IN,
RNS.Destination.SINGLE, "lxmf", "delivery",
)
receiver_dest_out = RNS.Destination(
receiver_id, RNS.Destination.OUT,
RNS.Destination.SINGLE, "lxmf", "delivery",
)
msg = LXMF.LXMessage(
receiver_dest_out, sender_dest,
content="Hello from integration test",
title="Test Title",
fields={"custom": 42},
desired_method=LXMF.LXMessage.DIRECT,
)
msg.pack()
unpacked = LXMF.LXMessage.unpack_from_bytes(msg.packed)
_emit({
"content_match": unpacked.content_as_string() == "Hello from integration test",
"title_match": unpacked.title_as_string() == "Test Title",
"fields_match": unpacked.fields.get("custom") == 42,
"hash_match": unpacked.hash == msg.hash,
"sig_validated": unpacked.signature_validated,
"incoming": unpacked.incoming,
"dest_hash_match": unpacked.destination_hash == receiver_dest_out.hash,
"src_hash_match": unpacked.source_hash == sender_dest.hash,
})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script))
assert all(data.values()), f"Failures: {[k for k, v in data.items() if not v]}"
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_empty_content_message(self):
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
try:
s_id = RNS.Identity()
r_id = RNS.Identity()
s_dest = RNS.Destination(s_id, RNS.Destination.IN, RNS.Destination.SINGLE, "lxmf", "delivery")
r_dest = RNS.Destination(r_id, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery")
msg = LXMF.LXMessage(r_dest, s_dest, content="", title="")
msg.pack()
unpacked = LXMF.LXMessage.unpack_from_bytes(msg.packed)
_emit({
"empty_content": unpacked.content_as_string() == "",
"empty_title": unpacked.title_as_string() == "",
"hash_valid": unpacked.hash is not None and len(unpacked.hash) == 32,
})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script))
assert all(data.values())
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_tampered_payload_invalidates_signature(self):
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
try:
s_id = RNS.Identity()
r_id = RNS.Identity()
s_dest = RNS.Destination(s_id, RNS.Destination.IN, RNS.Destination.SINGLE, "lxmf", "delivery")
r_dest = RNS.Destination(r_id, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery")
msg = LXMF.LXMessage(r_dest, s_dest, content="original", desired_method=LXMF.LXMessage.DIRECT)
msg.pack()
tampered = bytearray(msg.packed)
# Flip a byte in the payload section (after dest + src + sig)
payload_offset = 16 + 16 + 64
if payload_offset < len(tampered):
tampered[payload_offset] ^= 0xFF
try:
unpacked = LXMF.LXMessage.unpack_from_bytes(bytes(tampered))
sig_valid = unpacked.signature_validated
except Exception:
sig_valid = False
_emit({"signature_invalid_after_tamper": not sig_valid})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script))
assert data["signature_invalid_after_tamper"]
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_message_with_stamp(self):
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
try:
s_id = RNS.Identity()
r_id = RNS.Identity()
s_dest = RNS.Destination(s_id, RNS.Destination.IN, RNS.Destination.SINGLE, "lxmf", "delivery")
r_dest = RNS.Destination(r_id, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery")
msg = LXMF.LXMessage(
r_dest, s_dest,
content="Stamped message",
stamp_cost=2,
)
msg.defer_stamp = False
msg.pack()
unpacked = LXMF.LXMessage.unpack_from_bytes(msg.packed)
has_stamp = unpacked.stamp is not None
stamp_validates = unpacked.validate_stamp(2) if has_stamp else False
_emit({
"has_stamp": has_stamp,
"stamp_validates": stamp_validates,
"content_ok": unpacked.content_as_string() == "Stamped message",
})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script))
assert all(data.values()), f"Failures: {[k for k, v in data.items() if not v]}"
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_message_stamp_rejected_at_higher_cost(self):
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
try:
s_id = RNS.Identity()
r_id = RNS.Identity()
s_dest = RNS.Destination(s_id, RNS.Destination.IN, RNS.Destination.SINGLE, "lxmf", "delivery")
r_dest = RNS.Destination(r_id, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery")
msg = LXMF.LXMessage(r_dest, s_dest, content="Low stamp", stamp_cost=2)
msg.defer_stamp = False
msg.pack()
unpacked = LXMF.LXMessage.unpack_from_bytes(msg.packed)
validates_at_2 = unpacked.validate_stamp(2)
# The stamp might or might not pass at cost 20 (extremely unlikely)
validates_at_20 = unpacked.validate_stamp(20)
_emit({
"passes_at_required": validates_at_2,
"rejected_at_higher": not validates_at_20,
})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script))
assert data["passes_at_required"]
assert data["rejected_at_higher"]
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_large_message_uses_resource_representation(self):
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
try:
s_id = RNS.Identity()
r_id = RNS.Identity()
s_dest = RNS.Destination(s_id, RNS.Destination.IN, RNS.Destination.SINGLE, "lxmf", "delivery")
r_dest = RNS.Destination(r_id, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery")
large_content = "X" * 1000
msg = LXMF.LXMessage(
r_dest, s_dest,
content=large_content,
desired_method=LXMF.LXMessage.DIRECT,
)
msg.pack()
_emit({
"uses_resource": msg.representation == LXMF.LXMessage.RESOURCE,
"method_direct": msg.method == LXMF.LXMessage.DIRECT,
"packed_size": msg.packed_size,
})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script))
assert data["uses_resource"]
assert data["method_direct"]
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_opportunistic_falls_back_for_large_message(self):
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
try:
s_id = RNS.Identity()
r_id = RNS.Identity()
s_dest = RNS.Destination(s_id, RNS.Destination.IN, RNS.Destination.SINGLE, "lxmf", "delivery")
r_dest = RNS.Destination(r_id, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery")
large_content = "Y" * 500
msg = LXMF.LXMessage(
r_dest, s_dest,
content=large_content,
desired_method=LXMF.LXMessage.OPPORTUNISTIC,
)
msg.pack()
_emit({
"fell_back_to_direct": msg.desired_method == LXMF.LXMessage.DIRECT,
"method_is_direct": msg.method == LXMF.LXMessage.DIRECT,
})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script))
assert data["fell_back_to_direct"]
assert data["method_is_direct"]
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_propagated_message_packing(self):
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
try:
s_id = RNS.Identity()
r_id = RNS.Identity()
s_dest = RNS.Destination(s_id, RNS.Destination.IN, RNS.Destination.SINGLE, "lxmf", "delivery")
r_dest = RNS.Destination(r_id, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery")
msg = LXMF.LXMessage(
r_dest, s_dest,
content="Propagated message",
desired_method=LXMF.LXMessage.PROPAGATED,
)
msg.pack()
_emit({
"method_propagated": msg.method == LXMF.LXMessage.PROPAGATED,
"has_propagation_packed": msg.propagation_packed is not None,
"has_transient_id": msg.transient_id is not None,
"transient_id_length": len(msg.transient_id) if msg.transient_id else 0,
})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script))
assert data["method_propagated"]
assert data["has_propagation_packed"]
assert data["has_transient_id"]
assert data["transient_id_length"] == 32
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_paper_message_uri_generation(self):
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
try:
s_id = RNS.Identity()
r_id = RNS.Identity()
s_dest = RNS.Destination(s_id, RNS.Destination.IN, RNS.Destination.SINGLE, "lxmf", "delivery")
r_dest = RNS.Destination(r_id, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery")
msg = LXMF.LXMessage(
r_dest, s_dest,
content="Paper msg",
desired_method=LXMF.LXMessage.PAPER,
)
msg.pack()
uri = msg.as_uri()
_emit({
"starts_with_lxm": uri.startswith("lxm://"),
"uri_length": len(uri),
"method_paper": msg.method == LXMF.LXMessage.PAPER,
})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script))
assert data["starts_with_lxm"]
assert data["uri_length"] > 10
# ────────────────────────────────────────────────────────────────
# 3. LXMRouter delivery pipeline (subprocess)
# ────────────────────────────────────────────────────────────────
class TestLXMRouterDelivery:
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_direct_delivery_via_lxmf_delivery(self):
"""Pack a message from sender, feed bytes to receiver router, verify callback."""
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
try:
sender_id = RNS.Identity()
receiver_id = RNS.Identity()
router_a = LXMF.LXMRouter(identity=sender_id, storagepath=_tmpdir+"/alice")
router_b = LXMF.LXMRouter(identity=receiver_id, storagepath=_tmpdir+"/bob")
bob_dest = router_b.register_delivery_identity(receiver_id, display_name="Bob")
alice_dest = router_a.register_delivery_identity(sender_id, display_name="Alice")
delivered = []
def on_delivery(msg):
delivered.append({
"content": msg.content_as_string(),
"title": msg.title_as_string(),
"incoming": msg.incoming,
"dest_hash": msg.destination_hash.hex(),
"src_hash": msg.source_hash.hex(),
})
router_b.register_delivery_callback(on_delivery)
bob_dest_out = RNS.Destination(
receiver_id, RNS.Destination.OUT,
RNS.Destination.SINGLE, "lxmf", "delivery",
)
msg = LXMF.LXMessage(
bob_dest_out, alice_dest,
content="Direct delivery test",
title="Integration",
desired_method=LXMF.LXMessage.DIRECT,
)
msg.pack()
result = router_b.lxmf_delivery(msg.packed, method=LXMF.LXMessage.DIRECT)
_emit({
"delivery_returned_true": result == True,
"callback_fired": len(delivered) == 1,
"content_correct": delivered[0]["content"] == "Direct delivery test" if delivered else False,
"title_correct": delivered[0]["title"] == "Integration" if delivered else False,
"marked_incoming": delivered[0]["incoming"] if delivered else False,
})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script))
assert all(data.values()), f"Failures: {[k for k, v in data.items() if not v]}"
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_delivery_with_stamp_enforcement(self):
"""Stamped message passes delivery when stamps are enforced."""
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
try:
sender_id = RNS.Identity()
receiver_id = RNS.Identity()
router_a = LXMF.LXMRouter(identity=sender_id, storagepath=_tmpdir+"/alice")
router_b = LXMF.LXMRouter(
identity=receiver_id,
storagepath=_tmpdir+"/bob",
enforce_stamps=True,
)
bob_dest = router_b.register_delivery_identity(
receiver_id, display_name="Bob", stamp_cost=2,
)
alice_dest = router_a.register_delivery_identity(sender_id, display_name="Alice")
delivered = []
router_b.register_delivery_callback(lambda msg: delivered.append(msg))
bob_dest_out = RNS.Destination(
receiver_id, RNS.Destination.OUT,
RNS.Destination.SINGLE, "lxmf", "delivery",
)
msg = LXMF.LXMessage(
bob_dest_out, alice_dest,
content="Stamped delivery",
stamp_cost=2,
)
msg.defer_stamp = False
msg.pack()
result = router_b.lxmf_delivery(msg.packed, method=LXMF.LXMessage.DIRECT)
_emit({
"accepted": result == True,
"callback_fired": len(delivered) == 1,
"stamp_valid": delivered[0].stamp_valid if delivered else False,
"stamp_checked": delivered[0].stamp_checked if delivered else False,
})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script))
assert all(data.values()), f"Failures: {[k for k, v in data.items() if not v]}"
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_delivery_rejects_missing_stamp_when_enforced(self):
"""Message without stamp is dropped when stamp enforcement is on."""
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
try:
sender_id = RNS.Identity()
receiver_id = RNS.Identity()
router_a = LXMF.LXMRouter(identity=sender_id, storagepath=_tmpdir+"/alice")
router_b = LXMF.LXMRouter(
identity=receiver_id,
storagepath=_tmpdir+"/bob",
enforce_stamps=True,
)
bob_dest = router_b.register_delivery_identity(
receiver_id, display_name="Bob", stamp_cost=8,
)
alice_dest = router_a.register_delivery_identity(sender_id, display_name="Alice")
delivered = []
router_b.register_delivery_callback(lambda msg: delivered.append(msg))
bob_dest_out = RNS.Destination(
receiver_id, RNS.Destination.OUT,
RNS.Destination.SINGLE, "lxmf", "delivery",
)
msg = LXMF.LXMessage(bob_dest_out, alice_dest, content="No stamp")
msg.pack()
result = router_b.lxmf_delivery(msg.packed, method=LXMF.LXMessage.DIRECT)
_emit({
"rejected": result == False,
"callback_not_fired": len(delivered) == 0,
})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script))
assert data["rejected"]
assert data["callback_not_fired"]
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_delivery_allows_missing_stamp_when_not_enforced(self):
"""Without enforce_stamps, messages pass even with insufficient stamp."""
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
try:
sender_id = RNS.Identity()
receiver_id = RNS.Identity()
router_a = LXMF.LXMRouter(identity=sender_id, storagepath=_tmpdir+"/alice")
router_b = LXMF.LXMRouter(
identity=receiver_id,
storagepath=_tmpdir+"/bob",
enforce_stamps=False,
)
bob_dest = router_b.register_delivery_identity(
receiver_id, display_name="Bob", stamp_cost=8,
)
alice_dest = router_a.register_delivery_identity(sender_id, display_name="Alice")
delivered = []
router_b.register_delivery_callback(lambda msg: delivered.append(msg))
bob_dest_out = RNS.Destination(
receiver_id, RNS.Destination.OUT,
RNS.Destination.SINGLE, "lxmf", "delivery",
)
msg = LXMF.LXMessage(bob_dest_out, alice_dest, content="No stamp but allowed")
msg.pack()
result = router_b.lxmf_delivery(msg.packed, method=LXMF.LXMessage.DIRECT)
_emit({
"accepted": result == True,
"callback_fired": len(delivered) == 1,
"stamp_marked_invalid": not delivered[0].stamp_valid if delivered else False,
})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script))
assert all(data.values())
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_duplicate_message_rejected(self):
"""Same message delivered twice: first accepted, second rejected as duplicate."""
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
try:
sender_id = RNS.Identity()
receiver_id = RNS.Identity()
router_a = LXMF.LXMRouter(identity=sender_id, storagepath=_tmpdir+"/alice")
router_b = LXMF.LXMRouter(identity=receiver_id, storagepath=_tmpdir+"/bob")
bob_dest = router_b.register_delivery_identity(receiver_id, display_name="Bob")
alice_dest = router_a.register_delivery_identity(sender_id, display_name="Alice")
delivered = []
router_b.register_delivery_callback(lambda msg: delivered.append(msg))
bob_dest_out = RNS.Destination(
receiver_id, RNS.Destination.OUT,
RNS.Destination.SINGLE, "lxmf", "delivery",
)
msg = LXMF.LXMessage(bob_dest_out, alice_dest, content="Dedupe test")
msg.pack()
first = router_b.lxmf_delivery(msg.packed)
second = router_b.lxmf_delivery(msg.packed)
_emit({
"first_accepted": first == True,
"second_rejected": second == False,
"callback_count": len(delivered),
})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script))
assert data["first_accepted"]
assert data["second_rejected"]
assert data["callback_count"] == 1
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_duplicate_allowed_with_flag(self):
"""allow_duplicate=True lets the same message through twice."""
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
try:
sender_id = RNS.Identity()
receiver_id = RNS.Identity()
router_a = LXMF.LXMRouter(identity=sender_id, storagepath=_tmpdir+"/alice")
router_b = LXMF.LXMRouter(identity=receiver_id, storagepath=_tmpdir+"/bob")
bob_dest = router_b.register_delivery_identity(receiver_id, display_name="Bob")
alice_dest = router_a.register_delivery_identity(sender_id, display_name="Alice")
delivered = []
router_b.register_delivery_callback(lambda msg: delivered.append(msg))
bob_dest_out = RNS.Destination(
receiver_id, RNS.Destination.OUT,
RNS.Destination.SINGLE, "lxmf", "delivery",
)
msg = LXMF.LXMessage(bob_dest_out, alice_dest, content="Allow dupes")
msg.pack()
first = router_b.lxmf_delivery(msg.packed)
second = router_b.lxmf_delivery(msg.packed, allow_duplicate=True)
_emit({
"first_accepted": first == True,
"second_accepted": second == True,
"callback_count": len(delivered),
})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script))
assert data["first_accepted"]
assert data["second_accepted"]
assert data["callback_count"] == 2
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_ignored_source_blocked(self):
"""Messages from an ignored source hash are silently dropped."""
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
try:
sender_id = RNS.Identity()
receiver_id = RNS.Identity()
router_a = LXMF.LXMRouter(identity=sender_id, storagepath=_tmpdir+"/alice")
router_b = LXMF.LXMRouter(identity=receiver_id, storagepath=_tmpdir+"/bob")
bob_dest = router_b.register_delivery_identity(receiver_id, display_name="Bob")
alice_dest = router_a.register_delivery_identity(sender_id, display_name="Alice")
delivered = []
router_b.register_delivery_callback(lambda msg: delivered.append(msg))
bob_dest_out = RNS.Destination(
receiver_id, RNS.Destination.OUT,
RNS.Destination.SINGLE, "lxmf", "delivery",
)
msg = LXMF.LXMessage(bob_dest_out, alice_dest, content="Blocked sender")
msg.pack()
router_b.ignored_list.append(alice_dest.hash)
result = router_b.lxmf_delivery(msg.packed)
_emit({
"blocked": result == False,
"no_callback": len(delivered) == 0,
})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script))
assert data["blocked"]
assert data["no_callback"]
# ────────────────────────────────────────────────────────────────
# 4. Propagation node operations (subprocess)
# ────────────────────────────────────────────────────────────────
class TestPropagationNode:
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_enable_disable_propagation(self):
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
try:
node_id = RNS.Identity()
router = LXMF.LXMRouter(identity=node_id, storagepath=_tmpdir+"/node")
assert not router.propagation_node
router.enable_propagation()
enabled = router.propagation_node
router.disable_propagation()
disabled = not router.propagation_node
_emit({
"enabled": enabled,
"disabled": disabled,
})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script))
assert data["enabled"]
assert data["disabled"]
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_propagation_node_stores_message(self):
"""Feed a propagated message to a prop node and verify storage."""
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
import RNS.vendor.umsgpack as msgpack
try:
sender_id = RNS.Identity()
receiver_id = RNS.Identity()
node_id = RNS.Identity()
router_sender = LXMF.LXMRouter(identity=sender_id, storagepath=_tmpdir+"/sender")
router_node = LXMF.LXMRouter(identity=node_id, storagepath=_tmpdir+"/node")
router_node.enable_propagation()
alice_dest = router_sender.register_delivery_identity(sender_id, display_name="Alice")
bob_dest_out = RNS.Destination(
receiver_id, RNS.Destination.OUT,
RNS.Destination.SINGLE, "lxmf", "delivery",
)
msg = LXMF.LXMessage(
bob_dest_out, alice_dest,
content="Store on prop node",
desired_method=LXMF.LXMessage.PROPAGATED,
)
msg.pack()
unpacked_transfer = msgpack.unpackb(msg.propagation_packed)
raw_lxmf_data = unpacked_transfer[1][0]
entries_before = len(router_node.propagation_entries)
result = router_node.lxmf_propagation(
raw_lxmf_data, stamp_value=0, stamp_data=b"",
)
entries_after = len(router_node.propagation_entries)
_emit({
"stored": entries_after > entries_before,
"entry_count": entries_after,
})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script))
assert data["stored"]
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_propagation_stamp_cost_enforcement(self):
"""Prop node stores messages with valid stamp_value, rejects without."""
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
import RNS.vendor.umsgpack as msgpack
try:
sender_id = RNS.Identity()
receiver_id = RNS.Identity()
node_id = RNS.Identity()
router_sender = LXMF.LXMRouter(identity=sender_id, storagepath=_tmpdir+"/sender")
router_node = LXMF.LXMRouter(
identity=node_id,
storagepath=_tmpdir+"/node",
propagation_cost=14,
)
router_node.enable_propagation()
alice_dest = router_sender.register_delivery_identity(sender_id, display_name="Alice")
bob_dest_out = RNS.Destination(
receiver_id, RNS.Destination.OUT,
RNS.Destination.SINGLE, "lxmf", "delivery",
)
msg1 = LXMF.LXMessage(
bob_dest_out, alice_dest,
content="No PN stamp",
desired_method=LXMF.LXMessage.PROPAGATED,
)
msg1.pack()
raw1 = msgpack.unpackb(msg1.propagation_packed)[1][0]
entries_before = len(router_node.propagation_entries)
router_node.lxmf_propagation(raw1, stamp_value=0, stamp_data=b"")
no_stamp_stored = len(router_node.propagation_entries) > entries_before
msg2 = LXMF.LXMessage(
bob_dest_out, alice_dest,
content="With PN stamp",
desired_method=LXMF.LXMessage.PROPAGATED,
)
msg2.pack()
raw2 = msgpack.unpackb(msg2.propagation_packed)[1][0]
t_id = RNS.Identity.full_hash(raw2)
stamp, value = LXStamper.generate_stamp(
t_id, stamp_cost=14,
expand_rounds=LXStamper.WORKBLOCK_EXPAND_ROUNDS_PN,
)
entries_before2 = len(router_node.propagation_entries)
router_node.lxmf_propagation(raw2, stamp_value=value, stamp_data=stamp)
stamped_stored = len(router_node.propagation_entries) > entries_before2
_emit({
"no_stamp_stored_anyway": no_stamp_stored,
"stamped_stored": stamped_stored,
})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script, timeout=180))
assert data["stamped_stored"]
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_propagation_local_delivery(self):
"""Router with delivery identity decrypts and delivers propagated message."""
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
import RNS.vendor.umsgpack as msgpack
try:
sender_id = RNS.Identity()
receiver_id = RNS.Identity()
router = LXMF.LXMRouter(identity=receiver_id, storagepath=_tmpdir+"/node")
router.register_delivery_identity(receiver_id, display_name="Receiver")
delivered = []
router.register_delivery_callback(lambda msg: delivered.append(msg))
sender_dest = RNS.Destination(
sender_id, RNS.Destination.IN,
RNS.Destination.SINGLE, "lxmf", "delivery",
)
receiver_dest_out = RNS.Destination(
receiver_id, RNS.Destination.OUT,
RNS.Destination.SINGLE, "lxmf", "delivery",
)
msg = LXMF.LXMessage(
receiver_dest_out, sender_dest,
content="Local prop delivery",
desired_method=LXMF.LXMessage.PROPAGATED,
)
msg.pack()
raw_data = msgpack.unpackb(msg.propagation_packed)[1][0]
router.lxmf_propagation(raw_data)
time.sleep(0.2)
_emit({
"delivered_locally": len(delivered) == 1,
"content_correct": delivered[0].content_as_string() == "Local prop delivery" if delivered else False,
})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script))
assert data["delivered_locally"]
assert data["content_correct"]
# ────────────────────────────────────────────────────────────────
# 5. LXMessage state machine and callbacks (subprocess)
# ────────────────────────────────────────────────────────────────
class TestMessageStateAndCallbacks:
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_message_states_enum(self):
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
try:
_emit({
"generating": LXMF.LXMessage.GENERATING == 0x00,
"outbound": LXMF.LXMessage.OUTBOUND == 0x01,
"sending": LXMF.LXMessage.SENDING == 0x02,
"sent": LXMF.LXMessage.SENT == 0x04,
"delivered": LXMF.LXMessage.DELIVERED == 0x08,
"rejected": LXMF.LXMessage.REJECTED == 0xFD,
"cancelled": LXMF.LXMessage.CANCELLED == 0xFE,
"failed": LXMF.LXMessage.FAILED == 0xFF,
})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script))
assert all(data.values())
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_delivery_method_constants(self):
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
try:
_emit({
"opportunistic": LXMF.LXMessage.OPPORTUNISTIC == 0x01,
"direct": LXMF.LXMessage.DIRECT == 0x02,
"propagated": LXMF.LXMessage.PROPAGATED == 0x03,
"paper": LXMF.LXMessage.PAPER == 0x05,
"valid_methods_count": len(LXMF.LXMessage.valid_methods) == 4,
})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script))
assert all(data.values())
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_initial_message_state_is_generating(self):
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
try:
s_id = RNS.Identity()
r_id = RNS.Identity()
s_dest = RNS.Destination(s_id, RNS.Destination.IN, RNS.Destination.SINGLE, "lxmf", "delivery")
r_dest = RNS.Destination(r_id, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery")
msg = LXMF.LXMessage(r_dest, s_dest, content="State test")
pre_pack_state = msg.state
msg.pack()
post_pack_state = msg.state
_emit({
"pre_pack_generating": pre_pack_state == LXMF.LXMessage.GENERATING,
"post_pack_generating": post_pack_state == LXMF.LXMessage.GENERATING,
})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script))
assert all(data.values())
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_outbound_message_enters_pending(self):
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
try:
sender_id = RNS.Identity()
receiver_id = RNS.Identity()
router = LXMF.LXMRouter(identity=sender_id, storagepath=_tmpdir+"/sender")
src_dest = router.register_delivery_identity(sender_id, display_name="Sender")
dst_dest = RNS.Destination(
receiver_id, RNS.Destination.OUT,
RNS.Destination.SINGLE, "lxmf", "delivery",
)
msg = LXMF.LXMessage(dst_dest, src_dest, content="Outbound test")
pending_before = len(router.pending_outbound)
router.handle_outbound(msg)
time.sleep(0.2)
in_pending_or_failed = (
msg in router.pending_outbound or
msg in router.failed_outbound or
msg.state in [LXMF.LXMessage.SENT, LXMF.LXMessage.DELIVERED, LXMF.LXMessage.FAILED]
)
_emit({
"message_tracked": in_pending_or_failed,
"packed": msg.packed is not None,
})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script))
assert data["message_tracked"]
assert data["packed"]
# ────────────────────────────────────────────────────────────────
# 6. Multi-field message integrity (subprocess)
# ────────────────────────────────────────────────────────────────
class TestMessageFieldIntegrity:
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_fields_survive_roundtrip(self):
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
try:
s_id = RNS.Identity()
r_id = RNS.Identity()
s_dest = RNS.Destination(s_id, RNS.Destination.IN, RNS.Destination.SINGLE, "lxmf", "delivery")
r_dest = RNS.Destination(r_id, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery")
fields = {
"type": "chat",
"priority": 5,
"tags": ["urgent", "personal"],
"nested": {"key": "value"},
}
msg = LXMF.LXMessage(r_dest, s_dest, content="Fields test", fields=fields)
msg.pack()
unpacked = LXMF.LXMessage.unpack_from_bytes(msg.packed)
f = unpacked.fields
_emit({
"type_ok": f.get("type") == "chat",
"priority_ok": f.get("priority") == 5,
"tags_ok": f.get("tags") == ["urgent", "personal"],
"nested_ok": f.get("nested") == {"key": "value"},
})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script))
assert all(data.values())
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_binary_content_roundtrip(self):
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
try:
s_id = RNS.Identity()
r_id = RNS.Identity()
s_dest = RNS.Destination(s_id, RNS.Destination.IN, RNS.Destination.SINGLE, "lxmf", "delivery")
r_dest = RNS.Destination(r_id, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery")
binary_data = bytes(range(256))
msg = LXMF.LXMessage(r_dest, s_dest, content=binary_data, title="Binary")
msg.pack()
unpacked = LXMF.LXMessage.unpack_from_bytes(msg.packed)
_emit({
"binary_preserved": list(unpacked.content) == list(binary_data),
"title_ok": unpacked.title_as_string() == "Binary",
})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script))
assert all(data.values())
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_unicode_content_roundtrip(self):
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
try:
s_id = RNS.Identity()
r_id = RNS.Identity()
s_dest = RNS.Destination(s_id, RNS.Destination.IN, RNS.Destination.SINGLE, "lxmf", "delivery")
r_dest = RNS.Destination(r_id, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery")
unicode_text = "Mesh networking"
msg = LXMF.LXMessage(r_dest, s_dest, content=unicode_text, title=unicode_text)
msg.pack()
unpacked = LXMF.LXMessage.unpack_from_bytes(msg.packed)
_emit({
"content_ok": unpacked.content_as_string() == unicode_text,
"title_ok": unpacked.title_as_string() == unicode_text,
})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script))
assert all(data.values())
# ────────────────────────────────────────────────────────────────
# 7. URI ingestion (subprocess)
# ────────────────────────────────────────────────────────────────
class TestURIIngestion:
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_paper_uri_roundtrip(self):
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
try:
sender_id = RNS.Identity()
receiver_id = RNS.Identity()
router = LXMF.LXMRouter(identity=receiver_id, storagepath=_tmpdir+"/router")
router.register_delivery_identity(receiver_id, display_name="Receiver")
delivered = []
router.register_delivery_callback(lambda msg: delivered.append(msg))
s_dest = RNS.Destination(
sender_id, RNS.Destination.IN,
RNS.Destination.SINGLE, "lxmf", "delivery",
)
r_dest = RNS.Destination(
receiver_id, RNS.Destination.OUT,
RNS.Destination.SINGLE, "lxmf", "delivery",
)
msg = LXMF.LXMessage(
r_dest, s_dest,
content="Paper URI test",
desired_method=LXMF.LXMessage.PAPER,
)
msg.pack()
uri = msg.as_uri()
router.ingest_lxm_uri(uri)
time.sleep(0.5)
_emit({
"delivered": len(delivered) == 1,
"content_match": delivered[0].content_as_string() == "Paper URI test" if delivered else False,
})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script))
assert data["delivered"]
assert data["content_match"]
# ────────────────────────────────────────────────────────────────
# 8. Router configuration and state (subprocess)
# ────────────────────────────────────────────────────────────────
class TestRouterConfiguration:
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_single_delivery_identity_per_router(self):
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
try:
id_a = RNS.Identity()
id_b = RNS.Identity()
router = LXMF.LXMRouter(identity=id_a, storagepath=_tmpdir+"/router")
first = router.register_delivery_identity(id_a, display_name="A")
second = router.register_delivery_identity(id_b, display_name="B")
_emit({
"first_ok": first is not None,
"second_rejected": second is None,
})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script))
assert data["first_ok"]
assert data["second_rejected"]
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_stamp_cost_configuration(self):
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
try:
identity = RNS.Identity()
router = LXMF.LXMRouter(identity=identity, storagepath=_tmpdir+"/router")
dest = router.register_delivery_identity(identity, stamp_cost=5)
initial_cost = router.delivery_destinations[dest.hash].stamp_cost
router.set_inbound_stamp_cost(dest.hash, 10)
updated_cost = router.delivery_destinations[dest.hash].stamp_cost
router.set_inbound_stamp_cost(dest.hash, None)
cleared_cost = router.delivery_destinations[dest.hash].stamp_cost
_emit({
"initial_5": initial_cost == 5,
"updated_10": updated_cost == 10,
"cleared_none": cleared_cost is None,
})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script))
assert all(data.values())
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_enforce_stamps_toggle(self):
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
try:
identity = RNS.Identity()
router = LXMF.LXMRouter(
identity=identity,
storagepath=_tmpdir+"/router",
enforce_stamps=False,
)
initial = router._enforce_stamps
router.enforce_stamps()
after_enforce = router._enforce_stamps
router.ignore_stamps()
after_ignore = router._enforce_stamps
_emit({
"initial_false": not initial,
"enforced": after_enforce,
"ignored": not after_ignore,
})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script))
assert all(data.values())
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_outbound_propagation_node_set_and_read(self):
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
try:
identity = RNS.Identity()
router = LXMF.LXMRouter(identity=identity, storagepath=_tmpdir+"/router")
initial = router.get_outbound_propagation_node()
node_hash = os.urandom(16)
router.set_outbound_propagation_node(node_hash)
set_hash = router.get_outbound_propagation_node()
node_hash_2 = os.urandom(16)
router.set_outbound_propagation_node(node_hash_2)
updated_hash = router.get_outbound_propagation_node()
_emit({
"initial_none": initial is None,
"set_correctly": set_hash == node_hash,
"update_works": updated_hash == node_hash_2,
"different_nodes": node_hash != node_hash_2,
})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script))
assert all(data.values()), f"Failures: {[k for k, v in data.items() if not v]}"
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_router_max_delivery_attempts_constant(self):
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
try:
_emit({
"max_attempts": LXMF.LXMRouter.MAX_DELIVERY_ATTEMPTS,
"is_positive": LXMF.LXMRouter.MAX_DELIVERY_ATTEMPTS > 0,
"processing_interval": LXMF.LXMRouter.PROCESSING_INTERVAL,
"message_expiry": LXMF.LXMRouter.MESSAGE_EXPIRY,
"expiry_positive": LXMF.LXMRouter.MESSAGE_EXPIRY > 0,
})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script))
assert data["is_positive"]
assert data["expiry_positive"]
assert data["max_attempts"] == 5
# ────────────────────────────────────────────────────────────────
# 9. End-to-end two-router communication (subprocess)
# ────────────────────────────────────────────────────────────────
class TestTwoRouterCommunication:
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_bidirectional_protocol_delivery(self):
"""Alice sends to Bob, then Bob sends to Alice via lxmf_delivery."""
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
try:
alice_id = RNS.Identity()
bob_id = RNS.Identity()
router_a = LXMF.LXMRouter(identity=alice_id, storagepath=_tmpdir+"/alice")
router_b = LXMF.LXMRouter(identity=bob_id, storagepath=_tmpdir+"/bob")
alice_dest = router_a.register_delivery_identity(alice_id, display_name="Alice")
bob_dest = router_b.register_delivery_identity(bob_id, display_name="Bob")
alice_inbox = []
bob_inbox = []
router_a.register_delivery_callback(lambda msg: alice_inbox.append(msg))
router_b.register_delivery_callback(lambda msg: bob_inbox.append(msg))
bob_dest_out = RNS.Destination(bob_id, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery")
alice_dest_out = RNS.Destination(alice_id, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery")
msg_to_bob = LXMF.LXMessage(bob_dest_out, alice_dest, content="Hi Bob")
msg_to_bob.pack()
router_b.lxmf_delivery(msg_to_bob.packed)
msg_to_alice = LXMF.LXMessage(alice_dest_out, bob_dest, content="Hi Alice")
msg_to_alice.pack()
router_a.lxmf_delivery(msg_to_alice.packed)
_emit({
"bob_received": len(bob_inbox) == 1,
"bob_content": bob_inbox[0].content_as_string() == "Hi Bob" if bob_inbox else False,
"alice_received": len(alice_inbox) == 1,
"alice_content": alice_inbox[0].content_as_string() == "Hi Alice" if alice_inbox else False,
})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script))
assert all(data.values()), f"Failures: {[k for k, v in data.items() if not v]}"
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_multiple_messages_sequential_delivery(self):
"""Send several messages in sequence and verify all arrive in order."""
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
try:
alice_id = RNS.Identity()
bob_id = RNS.Identity()
router_a = LXMF.LXMRouter(identity=alice_id, storagepath=_tmpdir+"/alice")
router_b = LXMF.LXMRouter(identity=bob_id, storagepath=_tmpdir+"/bob")
alice_dest = router_a.register_delivery_identity(alice_id, display_name="Alice")
bob_dest = router_b.register_delivery_identity(bob_id, display_name="Bob")
inbox = []
router_b.register_delivery_callback(lambda msg: inbox.append(msg.content_as_string()))
bob_dest_out = RNS.Destination(bob_id, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery")
message_count = 10
for i in range(message_count):
msg = LXMF.LXMessage(bob_dest_out, alice_dest, content=f"Message {i}")
msg.pack()
router_b.lxmf_delivery(msg.packed, allow_duplicate=True)
expected = [f"Message {i}" for i in range(message_count)]
_emit({
"all_received": len(inbox) == message_count,
"order_preserved": inbox == expected,
})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script))
assert data["all_received"]
assert data["order_preserved"]
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_stamped_bidirectional_with_enforcement(self):
"""Both sides enforce stamps; both sides solve and verify."""
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
try:
alice_id = RNS.Identity()
bob_id = RNS.Identity()
router_a = LXMF.LXMRouter(
identity=alice_id, storagepath=_tmpdir+"/alice", enforce_stamps=True,
)
router_b = LXMF.LXMRouter(
identity=bob_id, storagepath=_tmpdir+"/bob", enforce_stamps=True,
)
alice_dest = router_a.register_delivery_identity(alice_id, display_name="Alice", stamp_cost=2)
bob_dest = router_b.register_delivery_identity(bob_id, display_name="Bob", stamp_cost=2)
alice_inbox = []
bob_inbox = []
router_a.register_delivery_callback(lambda msg: alice_inbox.append(msg))
router_b.register_delivery_callback(lambda msg: bob_inbox.append(msg))
bob_dest_out = RNS.Destination(bob_id, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery")
alice_dest_out = RNS.Destination(alice_id, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery")
msg_ab = LXMF.LXMessage(bob_dest_out, alice_dest, content="Stamped A->B", stamp_cost=2)
msg_ab.defer_stamp = False
msg_ab.pack()
r1 = router_b.lxmf_delivery(msg_ab.packed)
msg_ba = LXMF.LXMessage(alice_dest_out, bob_dest, content="Stamped B->A", stamp_cost=2)
msg_ba.defer_stamp = False
msg_ba.pack()
r2 = router_a.lxmf_delivery(msg_ba.packed)
_emit({
"ab_accepted": r1 == True,
"ba_accepted": r2 == True,
"bob_stamp_valid": bob_inbox[0].stamp_valid if bob_inbox else False,
"alice_stamp_valid": alice_inbox[0].stamp_valid if alice_inbox else False,
})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script))
assert all(data.values()), f"Failures: {[k for k, v in data.items() if not v]}"
# ────────────────────────────────────────────────────────────────
# 10. Reticulum identity and destination basics (subprocess)
# ────────────────────────────────────────────────────────────────
class TestReticulumPrimitives:
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_identity_creation_and_key_operations(self):
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
try:
identity = RNS.Identity()
pub_key = identity.get_public_key()
priv_key = identity.get_private_key()
test_data = b"Reticulum mesh network"
signature = identity.sign(test_data)
valid = identity.validate(signature, test_data)
tampered = identity.validate(signature, b"tampered data")
ciphertext = identity.encrypt(test_data)
plaintext = identity.decrypt(ciphertext)
_emit({
"has_pub_key": pub_key is not None and len(pub_key) > 0,
"has_priv_key": priv_key is not None and len(priv_key) > 0,
"sig_valid": valid,
"tampered_invalid": not tampered,
"encrypt_decrypt_ok": plaintext == test_data,
"hash_length": len(identity.hash) if identity.hash else 0,
})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script))
assert all(v for k, v in data.items() if k != "hash_length")
assert data["hash_length"] == RNS.Reticulum.TRUNCATED_HASHLENGTH // 8
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_destination_hash_determinism(self):
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
try:
identity = RNS.Identity()
dest1 = RNS.Destination(identity, RNS.Destination.IN, RNS.Destination.SINGLE, "lxmf", "delivery")
dest2 = RNS.Destination(identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery")
_emit({
"same_hash": dest1.hash == dest2.hash,
"hash_length": len(dest1.hash),
"hexhash_length": len(dest1.hexhash),
})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script))
assert data["same_hash"]
assert data["hash_length"] == 16
assert data["hexhash_length"] == 32
@pytest.mark.integration
@pytest.mark.skipif(not _RUN, reason="Set MESHCHAT_LIVE_RETICULUM=1")
def test_identity_persistence(self):
script = _SUBPROCESS_PREAMBLE + textwrap.dedent("""\
try:
identity = RNS.Identity()
id_path = os.path.join(_tmpdir, "test_identity")
identity.to_file(id_path)
loaded = RNS.Identity.from_file(id_path)
test_data = b"persistence check"
sig = identity.sign(test_data)
cross_valid = loaded.validate(sig, test_data)
_emit({
"file_exists": os.path.exists(id_path),
"loaded_ok": loaded is not None,
"same_pub_key": identity.get_public_key() == loaded.get_public_key(),
"cross_signature_valid": cross_valid,
})
finally:
_cleanup()
""")
data = _parse_result(_run_lxmf_script(script))
assert all(data.values())