feat(tests): add comprehensive tests for media APIs, including GIF, sticker, and sticker-pack functionalities, along with regression tests for media formats and HTTP integration.

This commit is contained in:
Ivan
2026-04-17 23:27:35 -05:00
parent 6a862a1641
commit 268fc8f864
34 changed files with 4184 additions and 29 deletions

View File

@@ -88,14 +88,14 @@
"method": "GET",
"path": "/api/v1/bots/status"
},
{
"method": "PATCH",
"path": "/api/v1/bots/update"
},
{
"method": "POST",
"path": "/api/v1/bots/stop"
},
{
"method": "PATCH",
"path": "/api/v1/bots/update"
},
{
"method": "GET",
"path": "/api/v1/community-interfaces"
@@ -248,6 +248,38 @@
"method": "POST",
"path": "/api/v1/favourites/{destination_hash}/rename"
},
{
"method": "GET",
"path": "/api/v1/gifs"
},
{
"method": "POST",
"path": "/api/v1/gifs"
},
{
"method": "GET",
"path": "/api/v1/gifs/export"
},
{
"method": "POST",
"path": "/api/v1/gifs/import"
},
{
"method": "DELETE",
"path": "/api/v1/gifs/{gif_id}"
},
{
"method": "PATCH",
"path": "/api/v1/gifs/{gif_id}"
},
{
"method": "GET",
"path": "/api/v1/gifs/{gif_id}/image"
},
{
"method": "POST",
"path": "/api/v1/gifs/{gif_id}/use"
},
{
"method": "GET",
"path": "/api/v1/identities"
@@ -376,10 +408,18 @@
"method": "PATCH",
"path": "/api/v1/lxmf/folders/{id}"
},
{
"method": "POST",
"path": "/api/v1/lxmf/propagation-node/restart"
},
{
"method": "GET",
"path": "/api/v1/lxmf/propagation-node/status"
},
{
"method": "POST",
"path": "/api/v1/lxmf/propagation-node/stop"
},
{
"method": "GET",
"path": "/api/v1/lxmf/propagation-node/stop-sync"
@@ -388,14 +428,6 @@
"method": "GET",
"path": "/api/v1/lxmf/propagation-node/sync"
},
{
"method": "POST",
"path": "/api/v1/lxmf/propagation-node/stop"
},
{
"method": "POST",
"path": "/api/v1/lxmf/propagation-node/restart"
},
{
"method": "GET",
"path": "/api/v1/lxmf/propagation-nodes"
@@ -416,6 +448,10 @@
"method": "DELETE",
"path": "/api/v1/maintenance/favourites"
},
{
"method": "DELETE",
"path": "/api/v1/maintenance/gifs"
},
{
"method": "DELETE",
"path": "/api/v1/maintenance/lxmf-icons"
@@ -596,6 +632,18 @@
"method": "GET",
"path": "/api/v1/reticulum/blackhole"
},
{
"method": "GET",
"path": "/api/v1/reticulum/config/raw"
},
{
"method": "PUT",
"path": "/api/v1/reticulum/config/raw"
},
{
"method": "POST",
"path": "/api/v1/reticulum/config/reset"
},
{
"method": "POST",
"path": "/api/v1/reticulum/disable-transport"
@@ -728,6 +776,38 @@
"method": "GET",
"path": "/api/v1/status"
},
{
"method": "GET",
"path": "/api/v1/sticker-packs"
},
{
"method": "POST",
"path": "/api/v1/sticker-packs"
},
{
"method": "POST",
"path": "/api/v1/sticker-packs/install"
},
{
"method": "POST",
"path": "/api/v1/sticker-packs/reorder"
},
{
"method": "DELETE",
"path": "/api/v1/sticker-packs/{pack_id}"
},
{
"method": "GET",
"path": "/api/v1/sticker-packs/{pack_id}"
},
{
"method": "PATCH",
"path": "/api/v1/sticker-packs/{pack_id}"
},
{
"method": "GET",
"path": "/api/v1/sticker-packs/{pack_id}/export"
},
{
"method": "GET",
"path": "/api/v1/stickers"

View File

@@ -0,0 +1,29 @@
# SPDX-License-Identifier: 0BSD
"""Shared bytes for media API and golden regression tests (PNG, GIF, gzip TGS)."""
from __future__ import annotations
import gzip
import json
TINY_PNG = (
bytes([0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A]) + b"\x00" * 32
)
TINY_GIF = b"GIF89a" + b"\x00" * 32
_LOTTIE_MIN = {
"v": "5.0.0",
"fr": 30.0,
"ip": 0.0,
"op": 60.0,
"w": 512,
"h": 512,
"nm": "golden",
"ddd": 0,
"assets": [],
"layers": [],
}
GZIP_TGS_512: bytes = gzip.compress(json.dumps(_LOTTIE_MIN).encode("ascii"))

View File

@@ -123,3 +123,17 @@ async def test_auto_propagation_api(mock_rns_minimal, temp_dir):
assert data["config"]["lxmf_propagation_sync_limit_in_bytes"] == 9_000_000
assert app_instance.config.lxmf_propagation_transfer_limit_in_bytes.get() == 250_000
assert app_instance.config.lxmf_propagation_sync_limit_in_bytes.get() == 9_000_000
mock_request = MagicMock()
mock_request.json = MagicMock(return_value=asyncio.Future())
mock_request.json.return_value.set_result(
{"lxmf_delivery_transfer_limit_in_bytes": 2_000_000_000},
)
response = await patch_handler(mock_request)
data = json.loads(response.body)
assert data["config"]["lxmf_delivery_transfer_limit_in_bytes"] == 1_000_000_000
assert (
app_instance.config.lxmf_delivery_transfer_limit_in_bytes.get() == 1_000_000_000
)
assert app_instance.message_router.delivery_per_transfer_limit == 1_000_000

View File

@@ -0,0 +1,182 @@
# SPDX-License-Identifier: 0BSD
"""Unit tests for gif validation and export/import document parsing."""
import base64
import pytest
from hypothesis import given, settings
from hypothesis import strategies as st
from meshchatx.src.backend import gif_utils
def test_normalize_image_type():
assert gif_utils.normalize_image_type("GIF") == "gif"
assert gif_utils.normalize_image_type("image/webp") == "webp"
assert gif_utils.normalize_image_type("png") is None
assert gif_utils.normalize_image_type("jpeg") is None
assert gif_utils.normalize_image_type("") is None
assert gif_utils.normalize_image_type(None) is None
def test_validate_gif_payload_ok_gif():
raw = b"GIF89a" + b"\x00" * 32
nt, h = gif_utils.validate_gif_payload(raw, "gif")
assert nt == "gif"
assert len(h) == 64
def test_validate_gif_payload_ok_webp():
raw = b"RIFF\x00\x00\x00\x00WEBP" + b"\x00" * 32
nt, _ = gif_utils.validate_gif_payload(raw, "webp")
assert nt == "webp"
def test_validate_gif_payload_too_large():
raw = b"x" * (gif_utils.MAX_GIF_BYTES + 1)
with pytest.raises(ValueError, match="image_too_large"):
gif_utils.validate_gif_payload(raw, "gif")
def test_validate_gif_payload_empty():
with pytest.raises(ValueError, match="empty_image"):
gif_utils.validate_gif_payload(b"", "gif")
def test_validate_gif_payload_bad_type():
with pytest.raises(ValueError, match="invalid_image_type"):
gif_utils.validate_gif_payload(b"GIF89a" + b"\x00" * 8, "png")
def test_validate_gif_payload_bad_magic():
with pytest.raises(ValueError, match="invalid_image_signature"):
gif_utils.validate_gif_payload(b"not-a-gif-file", "gif")
def test_validate_gif_payload_magic_type_mismatch():
raw = b"GIF89a" + b"\x00" * 16
with pytest.raises(ValueError, match="magic_type_mismatch"):
gif_utils.validate_gif_payload(raw, "webp")
def test_detect_image_format_from_magic():
assert gif_utils.detect_image_format_from_magic(b"GIF89a" + b"\x00" * 4) == "gif"
assert gif_utils.detect_image_format_from_magic(b"GIF87a" + b"\x00" * 4) == "gif"
webp = b"RIFF\x00\x00\x00\x00WEBP" + b"\x00" * 8
assert gif_utils.detect_image_format_from_magic(webp) == "webp"
assert gif_utils.detect_image_format_from_magic(b"\x89PNG\r\n\x1a\n") is None
assert gif_utils.detect_image_format_from_magic(b"") is None
assert gif_utils.detect_image_format_from_magic(b"short") is None
def test_sanitize_gif_name():
assert gif_utils.sanitize_gif_name(" hello ") == "hello"
assert gif_utils.sanitize_gif_name("") is None
assert gif_utils.sanitize_gif_name(None) is None
assert len(gif_utils.sanitize_gif_name("x" * 200) or "") == 128
def test_validate_export_document_ok():
raw = b"GIF89a" + b"\x00" * 32
b64 = base64.b64encode(raw).decode("ascii")
doc = {
"format": "meshchatx-gifs",
"version": 1,
"gifs": [
{"name": "g1", "image_type": "gif", "image_bytes": b64, "usage_count": 5},
],
}
items = gif_utils.validate_export_document(doc)
assert len(items) == 1
assert items[0]["image_bytes_b64"] == b64
assert items[0]["usage_count"] == 5
def test_validate_export_document_negative_usage_clamped():
raw = b"GIF89a" + b"\x00" * 32
b64 = base64.b64encode(raw).decode("ascii")
doc = {
"format": "meshchatx-gifs",
"version": 1,
"gifs": [
{"name": None, "image_type": "gif", "image_bytes": b64, "usage_count": -5},
],
}
items = gif_utils.validate_export_document(doc)
assert items[0]["usage_count"] == 0
def test_validate_export_document_wrong_format():
with pytest.raises(ValueError, match="invalid_format"):
gif_utils.validate_export_document(
{"format": "meshchatx-stickers", "version": 1, "gifs": []},
)
def test_validate_export_document_bad_version():
with pytest.raises(ValueError, match="unsupported_version"):
gif_utils.validate_export_document(
{"format": "meshchatx-gifs", "version": 99, "gifs": []},
)
def test_validate_export_document_missing_gifs_array():
with pytest.raises(ValueError, match="invalid_gifs_array"):
gif_utils.validate_export_document(
{"format": "meshchatx-gifs", "version": 1},
)
def test_validate_export_document_missing_image_bytes():
with pytest.raises(ValueError, match="missing_image_bytes_at_0"):
gif_utils.validate_export_document(
{
"format": "meshchatx-gifs",
"version": 1,
"gifs": [{"name": "x", "image_type": "gif", "image_bytes": ""}],
},
)
def test_validate_export_document_not_dict():
with pytest.raises(ValueError, match="invalid_document"):
gif_utils.validate_export_document([])
def test_mime_for_image_type():
assert gif_utils.mime_for_image_type("gif") == "image/gif"
assert gif_utils.mime_for_image_type("webp") == "image/webp"
assert gif_utils.mime_for_image_type("png") == "application/octet-stream"
def test_build_export_document_shape():
doc = gif_utils.build_export_document([{"x": 1}], "2026-01-01T00:00:00Z")
assert doc["format"] == "meshchatx-gifs"
assert doc["version"] == 1
assert doc["gifs"] == [{"x": 1}]
assert doc["exported_at"] == "2026-01-01T00:00:00Z"
@settings(max_examples=200, deadline=None)
@given(
raw=st.binary(min_size=0, max_size=4096),
typ=st.one_of(
st.none(),
st.text(max_size=40),
st.sampled_from(["gif", "webp", "image/gif", "image/webp", "png", ""]),
),
)
def test_validate_gif_payload_fuzz_never_raises_unexpected(raw, typ):
"""Fuzz: validation either succeeds or raises ValueError with known reasons."""
try:
gif_utils.validate_gif_payload(raw, typ)
except ValueError:
pass
@settings(max_examples=500, deadline=None)
@given(raw=st.binary(min_size=0, max_size=4096))
def test_detect_image_format_from_magic_fuzz_never_raises(raw):
out = gif_utils.detect_image_format_from_magic(raw)
assert out is None or out in {"gif", "webp"}

View File

@@ -0,0 +1,167 @@
# SPDX-License-Identifier: 0BSD
"""Tests for UserGifsDAO and schema migration for user_gifs."""
import base64
import pytest
from meshchatx.src.backend.database import Database
from meshchatx.src.backend.database.schema import DatabaseSchema
@pytest.fixture
def db(tmp_path):
path = tmp_path / "t.db"
database = Database(str(path))
database.initialize()
assert DatabaseSchema.LATEST_VERSION >= 45
return database
def _tiny_gif():
return b"GIF89a" + b"\x00" * 32
def _tiny_webp():
return b"RIFF\x00\x00\x00\x00WEBP" + b"\x00" * 32
def test_insert_and_list(db):
identity = "ab" * 16
raw = _tiny_gif()
row = db.gifs.insert(identity, "one", "gif", raw, None)
assert row is not None
assert row["id"] >= 1
assert row["usage_count"] == 0
assert row["last_used_at"] is None
listed = db.gifs.list_for_identity(identity)
assert len(listed) == 1
assert listed[0]["image_size"] == len(raw)
def test_insert_webp_ok(db):
identity = "ee" * 16
row = db.gifs.insert(identity, "w", "webp", _tiny_webp(), None)
assert row["image_type"] == "webp"
def test_duplicate_returns_none(db):
identity = "cd" * 16
raw = _tiny_gif()
r1 = db.gifs.insert(identity, None, "gif", raw, None)
assert r1 is not None
r2 = db.gifs.insert(identity, None, "gif", raw, None)
assert r2 is None
assert db.gifs.count_for_identity(identity) == 1
def test_record_usage_orders_by_most_used(db):
identity = "11" * 16
a = db.gifs.insert(identity, "a", "gif", _tiny_gif(), None)
b = db.gifs.insert(identity, "b", "gif", _tiny_gif() + b"x", None)
c = db.gifs.insert(identity, "c", "gif", _tiny_gif() + b"y", None)
db.gifs.record_usage(b["id"], identity)
db.gifs.record_usage(b["id"], identity)
db.gifs.record_usage(c["id"], identity)
rows = db.gifs.list_for_identity(identity)
ids = [r["id"] for r in rows]
# b (2 uses), c (1 use), a (0 uses)
assert ids == [b["id"], c["id"], a["id"]]
by_id = {r["id"]: r for r in rows}
assert by_id[b["id"]]["usage_count"] == 2
assert by_id[c["id"]]["usage_count"] == 1
assert by_id[a["id"]]["usage_count"] == 0
assert by_id[b["id"]]["last_used_at"] is not None
def test_record_usage_wrong_identity(db):
identity = "01" * 16
other = "02" * 16
row = db.gifs.insert(identity, None, "gif", _tiny_gif(), None)
assert db.gifs.record_usage(row["id"], other) is False
def test_delete_and_delete_all(db):
identity = "ef" * 16
db.gifs.insert(identity, "a", "gif", _tiny_gif(), None)
row = db.gifs.list_for_identity(identity)[0]
assert db.gifs.delete(row["id"], identity) is True
assert len(db.gifs.list_for_identity(identity)) == 0
db.gifs.insert(identity, "b", "gif", _tiny_gif(), None)
db.gifs.insert(identity, "c", "gif", _tiny_gif() + b"x", None)
n = db.gifs.delete_all_for_identity(identity)
assert n == 2
assert db.gifs.count_for_identity(identity) == 0
def test_get_row_image_blob(db):
identity = "aa" * 16
raw = _tiny_gif()
ins = db.gifs.insert(identity, "n", "gif", raw, "msg1")
full = db.gifs.get_row(ins["id"], identity)
assert full["image_blob"] == raw
assert full["source_message_hash"] == "msg1"
def test_update_name(db):
identity = "bb" * 16
ins = db.gifs.insert(identity, "old", "gif", _tiny_gif(), None)
assert db.gifs.update_name(ins["id"], identity, "new") is True
row = db.gifs.get_row(ins["id"], identity)
assert row["name"] == "new"
def test_export_and_import_roundtrip(db):
identity = "cc" * 16
raw = _tiny_gif()
ins = db.gifs.insert(identity, "a", "gif", raw, None)
db.gifs.record_usage(ins["id"], identity)
payloads = db.gifs.export_payloads_for_identity(identity)
assert len(payloads) == 1
assert base64.b64decode(payloads[0]["image_bytes"]) == raw
assert payloads[0]["usage_count"] == 1
other = "dd" * 16
items = [
{
"name": "x",
"image_type": "gif",
"image_bytes_b64": payloads[0]["image_bytes"],
"source_message_hash": None,
"usage_count": 7,
},
]
r = db.gifs.import_payloads(other, items, replace_duplicates=False)
assert r["imported"] == 1
rows = db.gifs.list_for_identity(other)
assert len(rows) == 1
assert rows[0]["usage_count"] == 7
r2 = db.gifs.import_payloads(other, items, replace_duplicates=False)
assert r2["skipped_duplicates"] == 1
def test_import_invalid_base64_skipped(db):
identity = "ff" * 16
items = [
{
"name": "x",
"image_type": "gif",
"image_bytes_b64": "!!!not-base64!!!",
"source_message_hash": None,
},
]
r = db.gifs.import_payloads(identity, items, replace_duplicates=False)
assert r["skipped_invalid"] >= 1
def test_gif_limit(db, monkeypatch):
from meshchatx.src.backend import gif_utils
monkeypatch.setattr(gif_utils, "MAX_GIFS_PER_IDENTITY", 2)
identity = "22" * 16
db.gifs.insert(identity, None, "gif", _tiny_gif(), None)
db.gifs.insert(identity, None, "gif", _tiny_gif() + b"y", None)
with pytest.raises(ValueError, match="gif_limit_reached"):
db.gifs.insert(identity, None, "gif", _tiny_gif() + b"z", None)

View File

@@ -0,0 +1,321 @@
# SPDX-License-Identifier: 0BSD
"""
Tests for encrypted IFAC values surfaced via interface discovery announces.
When an upstream interface is configured with publish_ifac = yes, RNS embeds
the network_name (ifac_netname) and passphrase (ifac_netkey) into the
discovery announce payload, plus a ready-to-paste config_entry block. These
tests verify that:
1. The /api/v1/reticulum/discovered-interfaces endpoint passes those fields
through to the frontend with both the raw RNS keys and the canonical
config-style aliases (network_name/passphrase).
2. ReticulumMeshChat.normalize_discovered_ifac_fields handles bytes payloads,
missing values, and non-list inputs without raising.
3. discovery_filter_candidates includes network_name so users can whitelist
or blacklist by IFAC network name.
"""
import json
import shutil
import tempfile
from unittest.mock import MagicMock, patch
import pytest
import RNS
from meshchatx.meshchat import ReticulumMeshChat
class ConfigDict(dict):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.write_called = False
def write(self):
self.write_called = True
return True
@pytest.fixture
def temp_dir():
path = tempfile.mkdtemp()
try:
yield path
finally:
shutil.rmtree(path)
def build_identity():
identity = MagicMock(spec=RNS.Identity)
identity.hash = b"test_hash_32_bytes_long_01234567"
identity.hexhash = identity.hash.hex()
identity.get_private_key.return_value = b"test_private_key"
return identity
async def find_route_handler(app_instance, path, method):
for route in app_instance.get_routes():
if route.path == path and route.method == method:
return route.handler
return None
def test_normalize_handles_non_list_input():
assert (
ReticulumMeshChat.normalize_discovered_ifac_fields({"foo": "bar"})
== {"foo": "bar"}
)
assert ReticulumMeshChat.normalize_discovered_ifac_fields(None) is None
def test_normalize_aliases_string_ifac_fields():
interfaces = [
{
"name": "kin.earth",
"type": "BackboneInterface",
"reachable_on": "rns.kin.earth",
"port": 4242,
"ifac_netname": "kin.earth",
"ifac_netkey": "asty8vT8spXNQdCnPVMATbCKkwUxuzG9",
"config_entry": "[[kin.earth]]\n type = BackboneInterface\n enabled = yes",
},
]
normalized = ReticulumMeshChat.normalize_discovered_ifac_fields(interfaces)
assert normalized[0]["ifac_netname"] == "kin.earth"
assert normalized[0]["ifac_netkey"] == "asty8vT8spXNQdCnPVMATbCKkwUxuzG9"
assert normalized[0]["network_name"] == "kin.earth"
assert normalized[0]["passphrase"] == "asty8vT8spXNQdCnPVMATbCKkwUxuzG9"
assert normalized[0]["publish_ifac"] is True
assert normalized[0]["config_entry"].startswith("[[kin.earth]]")
def test_normalize_decodes_bytes_ifac_fields():
interfaces = [
{
"name": "bytes-iface",
"type": "BackboneInterface",
"ifac_netname": b"bytes_net",
"ifac_netkey": b"bytes_key",
"config_entry": b"[[bytes-iface]]\n type = BackboneInterface",
},
]
normalized = ReticulumMeshChat.normalize_discovered_ifac_fields(interfaces)
assert normalized[0]["network_name"] == "bytes_net"
assert normalized[0]["passphrase"] == "bytes_key"
assert normalized[0]["config_entry"].startswith("[[bytes-iface]]")
def test_normalize_missing_ifac_fields_yields_none_aliases():
interfaces = [
{
"name": "open-iface",
"type": "TCPClientInterface",
"reachable_on": "10.0.0.1",
"port": 4242,
},
]
normalized = ReticulumMeshChat.normalize_discovered_ifac_fields(interfaces)
assert normalized[0]["ifac_netname"] is None
assert normalized[0]["ifac_netkey"] is None
assert normalized[0]["network_name"] is None
assert normalized[0]["passphrase"] is None
assert normalized[0]["publish_ifac"] is False
assert normalized[0]["config_entry"] is None
def test_normalize_skips_non_dict_entries():
interfaces = [
"not a dict",
{"name": "ok", "ifac_netname": "n", "ifac_netkey": "k"},
]
normalized = ReticulumMeshChat.normalize_discovered_ifac_fields(interfaces)
assert normalized[0] == "not a dict"
assert normalized[1]["network_name"] == "n"
assert normalized[1]["passphrase"] == "k"
def test_discovery_filter_candidates_includes_network_name():
iface = {
"name": "node-1",
"type": "BackboneInterface",
"network_name": "kin.earth",
"ifac_netname": "kin.earth",
}
candidates = ReticulumMeshChat.discovery_filter_candidates(iface)
assert "kin.earth" in candidates
def test_filter_discovered_interfaces_whitelist_by_network_name():
interfaces = [
{"name": "good", "type": "BackboneInterface", "network_name": "kin.earth"},
{"name": "bad", "type": "BackboneInterface", "network_name": "other.net"},
]
filtered = ReticulumMeshChat.filter_discovered_interfaces(
interfaces,
whitelist_patterns="kin.earth",
blacklist_patterns="",
)
names = [i["name"] for i in filtered]
assert names == ["good"]
@pytest.mark.asyncio
async def test_discovered_interfaces_endpoint_surfaces_ifac(temp_dir):
"""End-to-end: HTTP endpoint exposes IFAC values from RNS announce."""
config = ConfigDict({"reticulum": {}, "interfaces": {}})
with (
patch("meshchatx.meshchat.generate_ssl_certificate"),
patch("RNS.Reticulum") as mock_rns,
patch("RNS.Transport"),
patch("LXMF.LXMRouter"),
patch("meshchatx.meshchat.InterfaceDiscovery") as mock_discovery_cls,
):
mock_reticulum = mock_rns.return_value
mock_reticulum.config = config
mock_reticulum.configpath = "/tmp/mock_config"
mock_reticulum.is_connected_to_shared_instance = False
mock_reticulum.transport_enabled.return_value = True
mock_reticulum.get_interface_stats.return_value = {"interfaces": []}
cfg_block = (
"[[kin.earth]]\n"
" type = BackboneInterface\n"
" enabled = yes\n"
" remote = rns.kin.earth\n"
" target_port = 4242\n"
" transport_identity = eea3d09f02143e157b3dae83060ee843\n"
" network_name = kin.earth\n"
" passphrase = asty8vT8spXNQdCnPVMATbCKkwUxuzG9"
)
mock_discovery_cls.return_value.list_discovered_interfaces.return_value = [
{
"name": "kin.earth",
"type": "BackboneInterface",
"reachable_on": "rns.kin.earth",
"port": 4242,
"transport_id": "eea3d09f02143e157b3dae83060ee843",
"network_id": "abc123",
"value": 16,
"hops": 1,
"status": "available",
"last_heard": 1700000000,
"ifac_netname": "kin.earth",
"ifac_netkey": "asty8vT8spXNQdCnPVMATbCKkwUxuzG9",
"config_entry": cfg_block,
},
{
"name": "open-node",
"type": "BackboneInterface",
"reachable_on": "open.example",
"port": 4242,
"transport_id": "ff" * 16,
"network_id": "def456",
"value": 8,
"hops": 2,
"status": "available",
"last_heard": 1700000001,
},
]
app_instance = ReticulumMeshChat(
identity=build_identity(),
storage_dir=temp_dir,
reticulum_config_dir=temp_dir,
)
handler = await find_route_handler(
app_instance,
"/api/v1/reticulum/discovered-interfaces",
"GET",
)
assert handler
response = await handler(MagicMock())
data = json.loads(response.body)
ifaces = data["interfaces"]
assert len(ifaces) == 2
encrypted = next(i for i in ifaces if i["name"] == "kin.earth")
assert encrypted["ifac_netname"] == "kin.earth"
assert encrypted["ifac_netkey"] == "asty8vT8spXNQdCnPVMATbCKkwUxuzG9"
assert encrypted["network_name"] == "kin.earth"
assert encrypted["passphrase"] == "asty8vT8spXNQdCnPVMATbCKkwUxuzG9"
assert encrypted["publish_ifac"] is True
assert encrypted["config_entry"].startswith("[[kin.earth]]")
assert "network_name = kin.earth" in encrypted["config_entry"]
plain = next(i for i in ifaces if i["name"] == "open-node")
assert plain["network_name"] is None
assert plain["passphrase"] is None
assert plain["publish_ifac"] is False
@pytest.mark.asyncio
async def test_discovered_interfaces_filter_works_with_ifac_network_name(temp_dir):
"""Whitelist/blacklist patterns can match an announce by its network_name."""
config = ConfigDict(
{
"reticulum": {
"interface_discovery_whitelist": "kin.earth",
},
"interfaces": {},
},
)
with (
patch("meshchatx.meshchat.generate_ssl_certificate"),
patch("RNS.Reticulum") as mock_rns,
patch("RNS.Transport"),
patch("LXMF.LXMRouter"),
patch("meshchatx.meshchat.InterfaceDiscovery") as mock_discovery_cls,
):
mock_reticulum = mock_rns.return_value
mock_reticulum.config = config
mock_reticulum.configpath = "/tmp/mock_config"
mock_reticulum.is_connected_to_shared_instance = False
mock_reticulum.transport_enabled.return_value = True
mock_reticulum.get_interface_stats.return_value = {"interfaces": []}
mock_discovery_cls.return_value.list_discovered_interfaces.return_value = [
{
"name": "matching",
"type": "BackboneInterface",
"reachable_on": "10.0.0.1",
"port": 4242,
"ifac_netname": "kin.earth",
"ifac_netkey": "secret",
},
{
"name": "non-matching",
"type": "BackboneInterface",
"reachable_on": "10.0.0.2",
"port": 4242,
"ifac_netname": "other.net",
"ifac_netkey": "other",
},
]
app_instance = ReticulumMeshChat(
identity=build_identity(),
storage_dir=temp_dir,
reticulum_config_dir=temp_dir,
)
handler = await find_route_handler(
app_instance,
"/api/v1/reticulum/discovered-interfaces",
"GET",
)
assert handler
response = await handler(MagicMock())
data = json.loads(response.body)
names = [i["name"] for i in data["interfaces"]]
assert names == ["matching"]
assert data["interfaces"][0]["network_name"] == "kin.earth"
assert data["interfaces"][0]["passphrase"] == "secret"

View File

@@ -0,0 +1,256 @@
# SPDX-License-Identifier: 0BSD
"""Heavy property-based fuzzing for sticker/TGS/Lottie, WebM, GIFs, and pack JSON.
Run: pytest tests/backend/test_media_fuzzing.py
"""
from __future__ import annotations
import base64
import gzip
import json
import pytest
from hypothesis import given, settings
from hypothesis import strategies as st
from meshchatx.src.backend import gif_utils, sticker_pack_utils, sticker_utils
_JSON_LEAF = (
st.none()
| st.booleans()
| st.integers(min_value=-(2**53), max_value=2**53)
| st.floats(allow_nan=False, allow_infinity=False)
| st.text(max_size=64)
| st.binary(max_size=128)
)
def _recursive_json(max_leaves: int = 24):
return st.recursive(
_JSON_LEAF,
lambda children: st.lists(children, max_size=8)
| st.dictionaries(st.text(max_size=12), children, max_size=8),
max_leaves=max_leaves,
)
@settings(max_examples=400, deadline=None)
@given(raw=st.binary(min_size=0, max_size=sticker_utils.MAX_ANIMATED_BYTES + 2))
def test_parse_tgs_fuzz_never_raises_unexpected(raw):
try:
sticker_utils.parse_tgs(raw)
except ValueError:
pass
@settings(max_examples=350, deadline=None)
@given(
payload=st.dictionaries(
keys=st.text(max_size=8, alphabet=st.characters(blacklist_categories=("Cs",))),
values=_recursive_json(20),
max_size=24,
),
)
def test_parse_tgs_gzip_json_fuzz(payload):
merged = dict(payload)
merged.setdefault("v", "5.5.7")
merged.setdefault("fr", 30.0)
merged.setdefault("ip", 0.0)
merged.setdefault("op", 60.0)
merged.setdefault("w", 100)
merged.setdefault("h", 100)
raw = gzip.compress(json.dumps(merged, default=str).encode("utf-8", errors="surrogateescape"))
if len(raw) > sticker_utils.MAX_ANIMATED_BYTES:
raw = raw[: sticker_utils.MAX_ANIMATED_BYTES]
try:
sticker_utils.parse_tgs(raw)
except ValueError:
pass
@settings(max_examples=400, deadline=None)
@given(tail=st.binary(min_size=0, max_size=12_288))
def test_parse_webm_fuzz_never_raises_unexpected(tail):
raw = b"\x1a\x45\xdf\xa3" + tail
if len(raw) < 32:
raw = raw + b"\x00" * (32 - len(raw))
try:
sticker_utils.parse_webm(raw)
except ValueError:
pass
@settings(max_examples=300, deadline=None)
@given(
image_type=st.one_of(
st.none(),
st.text(max_size=48),
st.sampled_from(
["png", "jpeg", "webp", "gif", "bmp", "tgs", "webm", "svg", "image/png", ""],
),
),
raw=st.binary(min_size=0, max_size=8192),
)
def test_extract_metadata_fuzz_never_raises(image_type, raw):
it = image_type if isinstance(image_type, str) else "png"
sticker_utils.extract_metadata(it, raw)
@settings(max_examples=500, deadline=None)
@given(
raw=st.binary(min_size=0, max_size=8192),
typ=st.one_of(
st.none(),
st.text(max_size=48),
st.sampled_from(["png", "jpeg", "jpg", "webp", "gif", "bmp", "tgs", "webm", "svg", ""]),
),
strict=st.booleans(),
)
def test_validate_sticker_payload_fuzz_extended(raw, typ, strict):
try:
sticker_utils.validate_sticker_payload(raw, typ, strict=strict)
except ValueError:
pass
@settings(max_examples=250, deadline=None)
@given(
nt=st.sampled_from(["png", "jpeg", "webp", "gif", "bmp"]),
raw=st.binary(min_size=0, max_size=4096),
)
def test_detect_image_dimensions_fuzz_never_raises(nt, raw):
sticker_utils.detect_image_dimensions(nt, raw)
@settings(max_examples=600, deadline=None)
@given(name=st.one_of(st.none(), st.text(max_size=300)))
def test_sanitize_sticker_name_fuzz_never_raises(name):
sticker_utils.sanitize_sticker_name(name)
@settings(max_examples=600, deadline=None)
@given(emoji=st.one_of(st.none(), st.text(max_size=120)))
def test_sanitize_sticker_emoji_fuzz_never_raises(emoji):
sticker_utils.sanitize_sticker_emoji(emoji)
@settings(max_examples=400, deadline=None)
@given(t=st.text(max_size=120))
def test_mime_for_image_type_fuzz_never_raises(t):
sticker_utils.mime_for_image_type(t)
@settings(max_examples=400, deadline=None)
@given(
title=st.one_of(st.none(), st.text(max_size=200)),
short_name=st.one_of(st.none(), st.text(max_size=120)),
description=st.one_of(st.none(), st.text(max_size=400)),
pack_type=st.one_of(st.none(), st.text(max_size=40)),
)
def test_sticker_pack_sanitizers_fuzz_never_raises(title, short_name, description, pack_type):
sticker_pack_utils.sanitize_pack_title(title)
sticker_pack_utils.sanitize_pack_short_name(short_name)
sticker_pack_utils.sanitize_pack_description(description)
sticker_pack_utils.sanitize_pack_type(pack_type)
@settings(max_examples=250, deadline=None)
@given(doc=_recursive_json(28))
def test_validate_pack_document_fuzz_never_raises_unexpected(doc):
if not isinstance(doc, dict):
doc = {"x": doc}
try:
sticker_pack_utils.validate_pack_document(doc)
except ValueError:
pass
@settings(max_examples=300, deadline=None)
@given(
raw=st.binary(min_size=0, max_size=min(256 * 1024, gif_utils.MAX_GIF_BYTES + 1)),
typ=st.one_of(
st.none(),
st.text(max_size=48),
st.sampled_from(["gif", "webp", "image/gif", "image/webp", "png", ""]),
),
)
def test_validate_gif_payload_fuzz_extended(raw, typ):
try:
gif_utils.validate_gif_payload(raw, typ)
except ValueError:
pass
@settings(max_examples=400, deadline=None)
@given(name=st.one_of(st.none(), st.text(max_size=300)))
def test_sanitize_gif_name_fuzz_never_raises(name):
gif_utils.sanitize_gif_name(name)
@settings(max_examples=250, deadline=None)
@given(doc=_recursive_json(28))
def test_gif_validate_export_document_fuzz_never_raises_unexpected(doc):
if not isinstance(doc, dict):
doc = {"k": doc}
try:
gif_utils.validate_export_document(doc)
except ValueError:
pass
@settings(max_examples=200, deadline=None)
@given(
inner=st.dictionaries(
keys=st.text(max_size=12),
values=_recursive_json(16),
max_size=16,
),
)
def test_strict_tgs_from_structured_gzip_json_fuzz(inner):
doc = {
"v": "5.0.0",
"fr": 45.0,
"ip": 0.0,
"op": 90.0,
"w": 512,
"h": 512,
"nm": "fuzz",
"ddd": 0,
"assets": [],
"layers": [],
}
for k, v in inner.items():
if k not in doc:
doc[k] = v
raw = gzip.compress(json.dumps(doc, default=str).encode("ascii"))
if len(raw) > sticker_utils.MAX_ANIMATED_BYTES:
return
try:
sticker_utils.validate_sticker_payload(raw, "tgs", strict=True)
except ValueError:
pass
@settings(max_examples=150, deadline=None)
@given(
b64=st.one_of(st.text(max_size=400), st.binary(max_size=200).map(lambda b: base64.b64encode(b).decode("ascii"))),
)
def test_sticker_validate_export_document_sticker_rows_fuzz(b64):
doc = {
"format": "meshchatx-stickers",
"version": 1,
"stickers": [
{
"name": "n",
"image_type": "png",
"image_bytes": b64,
"emoji": None,
},
],
}
try:
sticker_utils.validate_export_document(doc)
except ValueError:
pass

View File

@@ -0,0 +1,283 @@
# SPDX-License-Identifier: 0BSD
"""HTTP integration tests for sticker, sticker-pack, and GIF APIs (aiohttp TestClient)."""
from __future__ import annotations
import base64
import pytest
from aiohttp import web
from aiohttp.test_utils import TestClient, TestServer
from tests.backend.media_test_assets import GZIP_TGS_512, TINY_GIF, TINY_PNG
pytestmark = pytest.mark.usefixtures("require_loopback_tcp")
def _build_aio_app(app):
routes = web.RouteTableDef()
auth_mw, mime_mw, sec_mw = app._define_routes(routes)
aio_app = web.Application(middlewares=[auth_mw, mime_mw, sec_mw])
aio_app.add_routes(routes)
return aio_app
@pytest.fixture
def web_media_app(mock_app):
mock_app.current_context.running = True
mock_app.config.auth_enabled.set(False)
return mock_app
@pytest.mark.asyncio
async def test_stickers_list_empty(web_media_app):
aio_app = _build_aio_app(web_media_app)
async with TestClient(TestServer(aio_app)) as client:
r = await client.get("/api/v1/stickers")
assert r.status == 200
body = await r.json()
assert body == {"stickers": []}
@pytest.mark.asyncio
async def test_stickers_create_get_image_export_import_roundtrip(web_media_app):
aio_app = _build_aio_app(web_media_app)
b64 = base64.b64encode(TINY_PNG).decode("ascii")
async with TestClient(TestServer(aio_app)) as client:
c = await client.post(
"/api/v1/stickers",
json={
"name": " A ",
"image_type": "png",
"image_bytes": b64,
"strict": False,
},
)
assert c.status == 200
st = (await c.json())["sticker"]
sid = st["id"]
img = await client.get(f"/api/v1/stickers/{sid}/image")
assert img.status == 200
assert img.headers.get("Content-Type", "").startswith("image/png")
raw = await img.read()
assert raw == TINY_PNG
ex = await client.get("/api/v1/stickers/export")
assert ex.status == 200
doc = await ex.json()
assert doc.get("format") == "meshchatx-stickers"
assert doc.get("version") == 1
assert isinstance(doc.get("stickers"), list)
assert len(doc["stickers"]) >= 1
imp = await client.post(
"/api/v1/stickers/import",
json={**doc, "replace_duplicates": True},
)
assert imp.status == 200
data = await imp.json()
assert data.get("imported", 0) >= 1
@pytest.mark.asyncio
async def test_stickers_create_missing_image_bytes(web_media_app):
aio_app = _build_aio_app(web_media_app)
async with TestClient(TestServer(aio_app)) as client:
r = await client.post("/api/v1/stickers", json={"image_type": "png"})
assert r.status == 400
assert (await r.json()).get("error") == "missing_image_bytes"
@pytest.mark.asyncio
async def test_stickers_create_invalid_base64(web_media_app):
aio_app = _build_aio_app(web_media_app)
async with TestClient(TestServer(aio_app)) as client:
r = await client.post(
"/api/v1/stickers",
json={"image_type": "png", "image_bytes": "@@@not-valid-base64!!!"},
)
assert r.status == 400
assert (await r.json()).get("error") == "invalid_base64"
@pytest.mark.asyncio
async def test_stickers_create_bad_payload(web_media_app):
aio_app = _build_aio_app(web_media_app)
b64 = base64.b64encode(b"not-a-png").decode("ascii")
async with TestClient(TestServer(aio_app)) as client:
r = await client.post(
"/api/v1/stickers",
json={"image_type": "png", "image_bytes": b64},
)
assert r.status == 400
assert "error" in await r.json()
@pytest.mark.asyncio
async def test_stickers_image_not_found(web_media_app):
aio_app = _build_aio_app(web_media_app)
async with TestClient(TestServer(aio_app)) as client:
r = await client.get("/api/v1/stickers/999999/image")
assert r.status == 404
@pytest.mark.asyncio
async def test_stickers_import_invalid_document(web_media_app):
aio_app = _build_aio_app(web_media_app)
async with TestClient(TestServer(aio_app)) as client:
r = await client.post(
"/api/v1/stickers/import",
json={"format": "wrong", "version": 1},
)
assert r.status == 400
body = await r.json()
assert "error" in body
@pytest.mark.asyncio
async def test_sticker_packs_create_list_get_delete(web_media_app):
aio_app = _build_aio_app(web_media_app)
async with TestClient(TestServer(aio_app)) as client:
cr = await client.post(
"/api/v1/sticker-packs",
json={"title": " Pack One ", "pack_type": "static"},
)
assert cr.status == 200
pack = (await cr.json())["pack"]
pid = pack["id"]
ls = await client.get("/api/v1/sticker-packs")
assert ls.status == 200
packs = (await ls.json())["packs"]
assert any(p["id"] == pid for p in packs)
g = await client.get(f"/api/v1/sticker-packs/{pid}")
assert g.status == 200
assert (await g.json())["pack"]["id"] == pid
dl = await client.delete(f"/api/v1/sticker-packs/{pid}")
assert dl.status == 200
@pytest.mark.asyncio
async def test_sticker_packs_install_minimal(web_media_app):
aio_app = _build_aio_app(web_media_app)
b64 = base64.b64encode(TINY_PNG).decode("ascii")
doc = {
"format": "meshchatx-stickerpack",
"version": 1,
"pack": {
"title": "Imported Pack",
"short_name": "imp",
"description": None,
"type": "static",
"author": None,
"is_strict": False,
},
"stickers": [
{"name": "a", "emoji": None, "image_type": "png", "image_bytes": b64},
],
}
async with TestClient(TestServer(aio_app)) as client:
r = await client.post("/api/v1/sticker-packs/install", json=doc)
assert r.status == 200
data = await r.json()
assert "pack" in data
assert data.get("imported", 0) >= 1
@pytest.mark.asyncio
async def test_sticker_packs_install_invalid(web_media_app):
aio_app = _build_aio_app(web_media_app)
async with TestClient(TestServer(aio_app)) as client:
r = await client.post(
"/api/v1/sticker-packs/install",
json={"format": "meshchatx-stickerpack", "version": 1},
)
assert r.status == 400
@pytest.mark.asyncio
async def test_gifs_create_get_image_use_export_import(web_media_app):
aio_app = _build_aio_app(web_media_app)
b64 = base64.b64encode(TINY_GIF).decode("ascii")
async with TestClient(TestServer(aio_app)) as client:
c = await client.post(
"/api/v1/gifs",
json={"name": "g", "image_type": "gif", "image_bytes": b64},
)
assert c.status == 200
gid = (await c.json())["gif"]["id"]
img = await client.get(f"/api/v1/gifs/{gid}/image")
assert img.status == 200
assert "gif" in (img.headers.get("Content-Type") or "").lower()
assert await img.read() == TINY_GIF
u = await client.post(f"/api/v1/gifs/{gid}/use")
assert u.status == 200
ex = await client.get("/api/v1/gifs/export")
assert ex.status == 200
doc = await ex.json()
assert doc.get("format") == "meshchatx-gifs"
imp = await client.post(
"/api/v1/gifs/import",
json={**doc, "replace_duplicates": True},
)
assert imp.status == 200
assert (await imp.json()).get("imported", 0) >= 1
@pytest.mark.asyncio
async def test_gifs_create_missing_image_bytes(web_media_app):
aio_app = _build_aio_app(web_media_app)
async with TestClient(TestServer(aio_app)) as client:
r = await client.post("/api/v1/gifs", json={"image_type": "gif"})
assert r.status == 400
assert (await r.json()).get("error") == "missing_image_bytes"
@pytest.mark.asyncio
async def test_gifs_patch_requires_name(web_media_app):
aio_app = _build_aio_app(web_media_app)
b64 = base64.b64encode(TINY_GIF).decode("ascii")
async with TestClient(TestServer(aio_app)) as client:
c = await client.post(
"/api/v1/gifs",
json={"image_type": "gif", "image_bytes": b64},
)
gid = (await c.json())["gif"]["id"]
r = await client.patch(f"/api/v1/gifs/{gid}", json={})
assert r.status == 400
assert (await r.json()).get("error") == "missing_name"
@pytest.mark.asyncio
async def test_gifs_image_not_found(web_media_app):
aio_app = _build_aio_app(web_media_app)
async with TestClient(TestServer(aio_app)) as client:
r = await client.get("/api/v1/gifs/999999/image")
assert r.status == 404
@pytest.mark.asyncio
async def test_stickers_strict_tgs_golden_bytes(web_media_app):
aio_app = _build_aio_app(web_media_app)
b64 = base64.b64encode(GZIP_TGS_512).decode("ascii")
async with TestClient(TestServer(aio_app)) as client:
r = await client.post(
"/api/v1/stickers",
json={
"name": "anim",
"image_type": "tgs",
"image_bytes": b64,
"strict": True,
},
)
assert r.status == 200
st = (await r.json())["sticker"]
assert st.get("image_type") == "tgs"

View File

@@ -422,7 +422,7 @@ def test_on_lxmf_sending_failed_no_propagation(mock_app):
mock_app.on_lxmf_sending_failed(mock_msg)
mock_app.on_lxmf_sending_state_updated.assert_called_once_with(
mock_msg,
context=None,
context=mock_app.current_context,
)

View File

@@ -0,0 +1,184 @@
# SPDX-License-Identifier: 0BSD
import asyncio
import json
from unittest.mock import AsyncMock, MagicMock, patch, PropertyMock
import LXMF
import pytest
from meshchatx.meshchat import ReticulumMeshChat
@pytest.fixture
def mock_app():
# Use __new__ to avoid full initialization
app = ReticulumMeshChat.__new__(ReticulumMeshChat)
app.current_context = MagicMock()
app.config = MagicMock()
app.database = MagicMock()
app.reticulum = MagicMock()
app.message_router = MagicMock()
app._await_transport_path = AsyncMock(return_value=True)
app.get_current_icon_hash = MagicMock(return_value=None)
app.db_upsert_lxmf_message = MagicMock()
app.websocket_broadcast = AsyncMock()
app._is_contact = MagicMock(return_value=False)
app._convert_webm_opus_to_ogg = MagicMock(side_effect=lambda b: b)
app.handle_lxmf_message_progress = AsyncMock()
# Setup context
ctx = app.current_context
ctx.message_router = app.message_router
ctx.database = app.database
ctx.config = app.config
ctx.local_lxmf_destination = MagicMock()
ctx.local_lxmf_destination.hexhash = "local_hash"
ctx.forwarding_manager = None
return app
@pytest.mark.asyncio
async def test_send_message_no_path_identity_recall_fails(mock_app):
destination_hash = "aa" * 16
with patch("meshchatx.meshchat.RNS.Identity.recall", return_value=None):
with pytest.raises(Exception, match="Could not find path to destination"):
await mock_app.send_message(
destination_hash=destination_hash,
content="hi",
)
@pytest.mark.asyncio
async def test_send_message_immediate_exception_in_router(mock_app):
destination_hash = "aa" * 16
fake_identity = MagicMock()
mock_app.message_router.handle_outbound.side_effect = Exception("Router failure")
with (
patch("meshchatx.meshchat.RNS.Identity.recall", return_value=fake_identity),
patch("meshchatx.meshchat.RNS.Destination", return_value=MagicMock()),
patch("meshchatx.meshchat.LXMF.LXMessage", return_value=MagicMock()),
):
with pytest.raises(Exception, match="Router failure"):
await mock_app.send_message(
destination_hash=destination_hash,
content="hi",
)
@pytest.mark.asyncio
async def test_on_lxmf_sending_failed_updates_state(mock_app):
mock_msg = MagicMock(spec=LXMF.LXMessage)
mock_msg.state = LXMF.LXMessage.FAILED
mock_msg.try_propagation_on_fail = False
mock_app.on_lxmf_sending_state_updated = MagicMock()
from meshchatx.meshchat import ReticulumMeshChat
ReticulumMeshChat.on_lxmf_sending_failed(mock_app, mock_msg)
mock_app.on_lxmf_sending_state_updated.assert_called_once_with(
mock_msg, context=mock_app.current_context
)
@pytest.mark.asyncio
async def test_propagation_fallback_on_failure(mock_app):
mock_msg = MagicMock(spec=LXMF.LXMessage)
mock_msg.state = LXMF.LXMessage.FAILED
mock_msg.try_propagation_on_fail = True
mock_msg.source_hash = b"source"
mock_app.send_failed_message_via_propagation_node = MagicMock()
mock_app.on_lxmf_sending_state_updated = MagicMock()
from meshchatx.meshchat import ReticulumMeshChat
ReticulumMeshChat.on_lxmf_sending_failed(mock_app, mock_msg)
mock_app.send_failed_message_via_propagation_node.assert_called_once_with(
mock_msg, context=mock_app.current_context
)
mock_app.on_lxmf_sending_state_updated.assert_called_once_with(
mock_msg, context=mock_app.current_context
)
@pytest.mark.asyncio
async def test_handle_lxmf_message_progress_failure_broadcast(mock_app):
mock_msg = MagicMock()
mock_msg.hash = MagicMock()
mock_msg.hash.hex.return_value = "msg_hash_hex"
mock_msg.progress = 0.0
mock_msg.delivery_attempts = 1
# State sequence: FAILED (first iteration should terminate loop)
type(mock_msg).state = PropertyMock(return_value=LXMF.LXMessage.FAILED)
mock_msg.method = LXMF.LXMessage.DIRECT
with (
patch("meshchatx.meshchat.convert_lxmf_state_to_string", return_value="failed"),
patch(
"meshchatx.meshchat.convert_lxmf_message_to_dict",
return_value={"hash": "hex", "state": "failed"},
),
patch("asyncio.sleep", return_value=asyncio.Future()) as mock_sleep,
):
mock_sleep.return_value.set_result(None)
from meshchatx.meshchat import ReticulumMeshChat
await ReticulumMeshChat.handle_lxmf_message_progress(
mock_app, mock_msg, context=mock_app.current_context
)
# Verify update was called
mock_app.database.messages.update_lxmf_message_state.assert_called()
# Verify websocket broadcast was called
mock_app.websocket_broadcast.assert_called()
args = mock_app.websocket_broadcast.call_args[0][0]
payload = json.loads(args)
assert payload["type"] == "lxmf_message_state_updated"
assert payload["lxmf_message"]["state"] == "failed"
@pytest.mark.asyncio
async def test_send_message_db_upsert_failure_still_broadcasts(mock_app):
# If db_upsert fails, we want to know if it's caught or if it crashes send_message.
# Actually, send_message doesn't have a try-except around db_upsert_lxmf_message.
# If it fails, the whole send_message fails, which returns 503 to frontend.
destination_hash = "aa" * 16
fake_identity = MagicMock()
mock_app.db_upsert_lxmf_message.side_effect = Exception("DB Error")
with (
patch("meshchatx.meshchat.RNS.Identity.recall", return_value=fake_identity),
patch("meshchatx.meshchat.RNS.Destination", return_value=MagicMock()),
patch("meshchatx.meshchat.LXMF.LXMessage", return_value=MagicMock()),
):
with pytest.raises(Exception, match="DB Error"):
await mock_app.send_message(
destination_hash=destination_hash,
content="hi",
)
@pytest.mark.asyncio
async def test_send_message_await_path_timeout(mock_app):
mock_app._await_transport_path = AsyncMock(return_value=False)
destination_hash = "aa" * 16
# Even if _await_transport_path returns False, it continues to recall identity
with patch("meshchatx.meshchat.RNS.Identity.recall", return_value=None):
with pytest.raises(Exception, match="Could not find path to destination"):
await mock_app.send_message(
destination_hash=destination_hash,
content="hi",
)

View File

@@ -0,0 +1,178 @@
# SPDX-License-Identifier: 0BSD
"""Tests for the Reticulum Config Editor backend endpoints and helpers."""
import json
import os
import shutil
import tempfile
from unittest.mock import MagicMock, patch
import pytest
import RNS
from meshchatx.meshchat import ReticulumMeshChat
@pytest.fixture
def temp_dir():
dir_path = tempfile.mkdtemp()
yield dir_path
shutil.rmtree(dir_path)
@pytest.fixture
def mock_rns_minimal():
with (
patch("RNS.Reticulum") as mock_rns,
patch("RNS.Transport"),
patch("LXMF.LXMRouter"),
patch("meshchatx.meshchat.get_file_path", return_value="/tmp/mock_path"),
):
mock_rns_instance = mock_rns.return_value
mock_rns_instance.configpath = "/tmp/mock_config"
mock_rns_instance.is_connected_to_shared_instance = False
mock_rns_instance.transport_enabled.return_value = True
mock_id = MagicMock(spec=RNS.Identity)
mock_id.hash = b"test_hash_32_bytes_long_01234567"
mock_id.hexhash = mock_id.hash.hex()
mock_id.get_private_key.return_value = b"test_private_key"
yield mock_id
@pytest.fixture
def app_instance(mock_rns_minimal, temp_dir):
with patch("meshchatx.meshchat.generate_ssl_certificate"):
instance = ReticulumMeshChat(
identity=mock_rns_minimal,
storage_dir=temp_dir,
reticulum_config_dir=os.path.join(temp_dir, ".reticulum"),
)
yield instance
def _find_handler(app, method, path):
for route in app.get_routes():
if route.path == path and route.method == method:
return route.handler
raise AssertionError(f"Handler not found: {method} {path}")
def _make_request(json_body=None):
request = MagicMock()
async def _json():
if json_body is None:
raise ValueError("no json body")
return json_body
request.json = _json
return request
def test_default_reticulum_config_text_contains_required_sections():
text = ReticulumMeshChat._default_reticulum_config_text()
assert "[reticulum]" in text
assert "[interfaces]" in text
assert "[logging]" in text
assert "AutoInterface" in text
def test_default_config_text_is_idempotent_across_calls():
a = ReticulumMeshChat._default_reticulum_config_text()
b = ReticulumMeshChat._default_reticulum_config_text()
assert a == b
@pytest.mark.asyncio
async def test_endpoint_get_returns_current_config(app_instance):
handler = _find_handler(app_instance, "GET", "/api/v1/reticulum/config/raw")
response = await handler(_make_request())
assert response.status == 200
payload = json.loads(response.body)
assert "[reticulum]" in payload["content"]
assert "[interfaces]" in payload["content"]
assert payload["path"].endswith("config")
@pytest.mark.asyncio
async def test_endpoint_put_writes_valid_config(app_instance):
handler = _find_handler(app_instance, "PUT", "/api/v1/reticulum/config/raw")
new_content = (
"[reticulum]\n enable_transport = False\n\n"
"[interfaces]\n [[Default Interface]]\n type = AutoInterface\n"
)
response = await handler(_make_request({"content": new_content}))
assert response.status == 200
payload = json.loads(response.body)
assert payload["path"].endswith("config")
with open(app_instance._reticulum_config_file_path()) as f:
on_disk = f.read()
assert on_disk == new_content
@pytest.mark.asyncio
async def test_endpoint_put_rejects_missing_sections(app_instance):
handler = _find_handler(app_instance, "PUT", "/api/v1/reticulum/config/raw")
response = await handler(_make_request({"content": "garbage"}))
assert response.status == 400
payload = json.loads(response.body)
assert "[reticulum]" in payload["error"]
@pytest.mark.asyncio
async def test_endpoint_put_rejects_non_string_content(app_instance):
handler = _find_handler(app_instance, "PUT", "/api/v1/reticulum/config/raw")
response = await handler(_make_request({"content": 123}))
assert response.status == 400
@pytest.mark.asyncio
async def test_endpoint_put_rejects_invalid_json(app_instance):
handler = _find_handler(app_instance, "PUT", "/api/v1/reticulum/config/raw")
response = await handler(_make_request())
assert response.status == 400
@pytest.mark.asyncio
async def test_endpoint_reset_restores_defaults(app_instance):
reset_handler = _find_handler(
app_instance,
"POST",
"/api/v1/reticulum/config/reset",
)
config_path = app_instance._reticulum_config_file_path()
with open(config_path, "w") as f:
f.write(
"[reticulum]\n enable_transport = True\n\n"
"[interfaces]\n [[X]]\n type = AutoInterface\n",
)
response = await reset_handler(_make_request())
assert response.status == 200
payload = json.loads(response.body)
assert payload["content"] == ReticulumMeshChat._default_reticulum_config_text()
with open(config_path) as f:
on_disk = f.read()
assert on_disk == ReticulumMeshChat._default_reticulum_config_text()
@pytest.mark.asyncio
async def test_endpoint_get_recreates_config_if_missing(app_instance):
config_path = app_instance._reticulum_config_file_path()
os.remove(config_path)
assert not os.path.exists(config_path)
handler = _find_handler(app_instance, "GET", "/api/v1/reticulum/config/raw")
response = await handler(_make_request())
assert response.status == 200
assert os.path.exists(config_path)

View File

@@ -0,0 +1,154 @@
# SPDX-License-Identifier: 0BSD
"""Unit tests for sticker pack document validation and sanitisation."""
import base64
import pytest
from meshchatx.src.backend import sticker_pack_utils
def test_sanitize_pack_title_default():
assert sticker_pack_utils.sanitize_pack_title(None) == "Untitled pack"
assert sticker_pack_utils.sanitize_pack_title(" ") == "Untitled pack"
assert sticker_pack_utils.sanitize_pack_title(" hello ") == "hello"
assert len(sticker_pack_utils.sanitize_pack_title("x" * 500)) == 80
def test_sanitize_pack_short_name():
assert sticker_pack_utils.sanitize_pack_short_name(None) is None
assert sticker_pack_utils.sanitize_pack_short_name("Cats!@#") == "cats"
assert (
sticker_pack_utils.sanitize_pack_short_name(" Hello_World-1 ")
== "hello_world-1"
)
assert sticker_pack_utils.sanitize_pack_short_name("***") is None
assert len(sticker_pack_utils.sanitize_pack_short_name("a" * 200)) == 32
def test_sanitize_pack_description():
assert sticker_pack_utils.sanitize_pack_description(None) is None
assert sticker_pack_utils.sanitize_pack_description(" ") is None
assert sticker_pack_utils.sanitize_pack_description(" hi\nthere ") == "hi\nthere"
assert len(sticker_pack_utils.sanitize_pack_description("x" * 1000)) == 280
def test_sanitize_pack_type():
assert sticker_pack_utils.sanitize_pack_type(None) == "mixed"
assert sticker_pack_utils.sanitize_pack_type("") == "mixed"
assert sticker_pack_utils.sanitize_pack_type("static") == "static"
assert sticker_pack_utils.sanitize_pack_type("ANIMATED") == "animated"
assert sticker_pack_utils.sanitize_pack_type("VIDEO") == "video"
assert sticker_pack_utils.sanitize_pack_type("garbage") == "mixed"
def test_build_pack_document_shape():
pack = {
"title": " My Pack ",
"short_name": "MyPack!",
"description": "desc",
"pack_type": "static",
"author": "alice",
"is_strict": True,
}
doc = sticker_pack_utils.build_pack_document(pack, [], "2026-01-01T00:00:00Z")
assert doc["format"] == "meshchatx-stickerpack"
assert doc["version"] == 1
assert doc["pack"]["title"] == "My Pack"
assert doc["pack"]["short_name"] == "mypack"
assert doc["pack"]["type"] == "static"
assert doc["pack"]["author"] == "alice"
assert doc["stickers"] == []
def test_validate_pack_document_ok():
b64 = base64.b64encode(b"\x89PNG\r\n\x1a\n" + b"\x00" * 8).decode("ascii")
doc = {
"format": "meshchatx-stickerpack",
"version": 1,
"pack": {
"title": "Cats",
"short_name": "cats",
"description": None,
"type": "static",
"author": None,
"is_strict": True,
},
"stickers": [
{"name": "kitten", "emoji": "cat", "image_type": "png", "image_bytes": b64},
],
}
out = sticker_pack_utils.validate_pack_document(doc)
assert out["pack"]["title"] == "Cats"
assert out["pack"]["pack_type"] == "static"
assert len(out["stickers"]) == 1
assert out["stickers"][0]["image_bytes_b64"] == b64
assert out["stickers"][0]["emoji"] == "cat"
def test_validate_pack_document_wrong_format():
with pytest.raises(ValueError, match="invalid_pack_format"):
sticker_pack_utils.validate_pack_document(
{"format": "other", "version": 1, "pack": {}, "stickers": []},
)
def test_validate_pack_document_unsupported_version():
with pytest.raises(ValueError, match="unsupported_pack_version"):
sticker_pack_utils.validate_pack_document(
{
"format": "meshchatx-stickerpack",
"version": 99,
"pack": {},
"stickers": [],
},
)
def test_validate_pack_document_not_dict():
with pytest.raises(ValueError, match="invalid_pack_document"):
sticker_pack_utils.validate_pack_document([])
def test_validate_pack_document_missing_meta():
with pytest.raises(ValueError, match="invalid_pack_meta"):
sticker_pack_utils.validate_pack_document(
{"format": "meshchatx-stickerpack", "version": 1, "stickers": []},
)
def test_validate_pack_document_bad_stickers():
with pytest.raises(ValueError, match="invalid_pack_stickers"):
sticker_pack_utils.validate_pack_document(
{
"format": "meshchatx-stickerpack",
"version": 1,
"pack": {},
"stickers": "no",
},
)
def test_validate_pack_document_bad_sticker_entry():
with pytest.raises(ValueError, match="invalid_pack_sticker_at_0"):
sticker_pack_utils.validate_pack_document(
{
"format": "meshchatx-stickerpack",
"version": 1,
"pack": {},
"stickers": ["nope"],
},
)
def test_validate_pack_document_missing_bytes():
with pytest.raises(ValueError, match="missing_pack_sticker_bytes_at_0"):
sticker_pack_utils.validate_pack_document(
{
"format": "meshchatx-stickerpack",
"version": 1,
"pack": {},
"stickers": [{"name": "x", "image_type": "png"}],
},
)

View File

@@ -0,0 +1,155 @@
# SPDX-License-Identifier: 0BSD
"""Tests for UserStickerPacksDAO and pack/sticker association."""
import pytest
from meshchatx.src.backend.database import Database
@pytest.fixture
def db(tmp_path):
path = tmp_path / "t.db"
database = Database(str(path))
database.initialize()
return database
def _tiny_png():
return bytes([0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A]) + b"\x00" * 32
def test_create_and_list_packs(db):
identity = "aa" * 16
pack = db.sticker_packs.insert(
identity,
"Cats",
short_name="cats",
description="furry",
pack_type="static",
)
assert pack["id"] >= 1
assert pack["title"] == "Cats"
assert pack["short_name"] == "cats"
rows = db.sticker_packs.list_for_identity(identity)
assert len(rows) == 1
assert rows[0]["title"] == "Cats"
def test_short_name_unique_per_identity(db):
identity = "bb" * 16
db.sticker_packs.insert(identity, "A", short_name="dup")
with pytest.raises(ValueError, match="duplicate_pack_short_name"):
db.sticker_packs.insert(identity, "B", short_name="dup")
def test_short_name_scoped_to_identity(db):
a = "11" * 16
b = "22" * 16
db.sticker_packs.insert(a, "A", short_name="shared")
db.sticker_packs.insert(b, "B", short_name="shared")
assert db.sticker_packs.count_for_identity(a) == 1
assert db.sticker_packs.count_for_identity(b) == 1
def test_update_pack(db):
identity = "cc" * 16
pack = db.sticker_packs.insert(identity, "Old")
ok = db.sticker_packs.update(
pack["id"],
identity,
title="New",
description="desc",
pack_type="animated",
)
assert ok is True
row = db.sticker_packs.get_row(pack["id"], identity)
assert row["title"] == "New"
assert row["description"] == "desc"
assert row["pack_type"] == "animated"
def test_delete_detaches_stickers(db):
identity = "dd" * 16
pack = db.sticker_packs.insert(identity, "P")
sticker = db.stickers.insert(
identity,
"s",
"png",
_tiny_png(),
None,
pack_id=pack["id"],
)
assert sticker is not None
assert db.sticker_packs.delete(pack["id"], identity) is True
full = db.stickers.get_row(sticker["id"], identity)
assert full is not None
assert full["pack_id"] is None
def test_delete_with_stickers_removes_them(db):
identity = "ee" * 16
pack = db.sticker_packs.insert(identity, "P")
sticker = db.stickers.insert(
identity,
"s",
"png",
_tiny_png(),
None,
pack_id=pack["id"],
)
assert sticker is not None
assert db.sticker_packs.delete_with_stickers(pack["id"], identity) is True
assert db.stickers.get_row(sticker["id"], identity) is None
def test_reorder(db):
identity = "ff" * 16
a = db.sticker_packs.insert(identity, "A")
b = db.sticker_packs.insert(identity, "B")
c = db.sticker_packs.insert(identity, "C")
n = db.sticker_packs.reorder(identity, [c["id"], a["id"], b["id"]])
assert n == 3
rows = db.sticker_packs.list_for_identity(identity)
assert [r["title"] for r in rows] == ["C", "A", "B"]
def test_assign_to_pack(db):
identity = "01" * 16
pack = db.sticker_packs.insert(identity, "P")
sticker = db.stickers.insert(identity, "s", "png", _tiny_png(), None)
assert db.stickers.assign_to_pack(sticker["id"], identity, pack["id"]) is True
rows = db.stickers.list_for_pack(pack["id"], identity)
assert len(rows) == 1
assert db.stickers.assign_to_pack(sticker["id"], identity, None) is True
assert db.stickers.list_for_pack(pack["id"], identity) == []
def test_export_and_install_pack_roundtrip(db):
identity = "02" * 16
other = "03" * 16
pack = db.sticker_packs.insert(identity, "Pack", short_name="pack")
db.stickers.insert(identity, "s", "png", _tiny_png(), None, pack_id=pack["id"])
payloads = db.stickers.export_payloads_for_pack(pack["id"], identity)
assert len(payloads) == 1
new_pack = db.sticker_packs.insert(other, "Pack")
items = [
{
"name": p["name"],
"image_type": p["image_type"],
"image_bytes_b64": p["image_bytes"],
"emoji": p.get("emoji"),
}
for p in payloads
]
r = db.stickers.import_payloads(
other,
items,
replace_duplicates=False,
pack_id=new_pack["id"],
)
assert r["imported"] == 1
rows = db.stickers.list_for_pack(new_pack["id"], other)
assert len(rows) == 1

View File

@@ -29,7 +29,9 @@ def test_validate_sticker_payload_ok():
def test_validate_sticker_payload_too_large():
raw = b"x" * (sticker_utils.MAX_STICKER_BYTES + 1)
prefix = b"\x89PNG\r\n\x1a\n" + b"\x00" * 24
raw = prefix + b"x" * (sticker_utils.MAX_STICKER_BYTES + 1 - len(prefix))
assert len(raw) > sticker_utils.MAX_STICKER_BYTES
with pytest.raises(ValueError, match="image_too_large"):
sticker_utils.validate_sticker_payload(raw, "png")
@@ -67,10 +69,25 @@ def test_detect_image_format_from_magic():
assert sticker_utils.detect_image_format_from_magic(b"BM" + b"\x00" * 20) == "bmp"
webp = b"RIFF\x00\x00\x00\x00WEBP" + b"\x00" * 8
assert sticker_utils.detect_image_format_from_magic(webp) == "webp"
assert (
sticker_utils.detect_image_format_from_magic(b"\x1a\x45\xdf\xa3" + b"\x00" * 8)
== "webm"
)
assert (
sticker_utils.detect_image_format_from_magic(b"\x1f\x8b\x08\x00" + b"\x00" * 8)
== "tgs"
)
assert sticker_utils.detect_image_format_from_magic(b"") is None
assert sticker_utils.detect_image_format_from_magic(b"short") is None
def test_normalize_image_type_extended():
assert sticker_utils.normalize_image_type("tgs") == "tgs"
assert sticker_utils.normalize_image_type("application/x-tgsticker") == "tgs"
assert sticker_utils.normalize_image_type("video/webm") == "webm"
assert sticker_utils.normalize_image_type("webm") == "webm"
def test_validate_sticker_payload_jpg_alias_matches_jpeg_magic():
raw = b"\xff\xd8\xff\xe0" + b"\x00" * 64
nt, _ = sticker_utils.validate_sticker_payload(raw, "jpg")
@@ -153,7 +170,7 @@ def test_validate_sticker_payload_fuzz_never_raises_unexpected(raw, typ):
@given(raw=st.binary(min_size=0, max_size=4096))
def test_detect_image_format_from_magic_fuzz_never_raises(raw):
out = sticker_utils.detect_image_format_from_magic(raw)
assert out is None or out in {"png", "jpeg", "gif", "webp", "bmp"}
assert out is None or out in {"png", "jpeg", "gif", "webp", "bmp", "tgs", "webm"}
@settings(max_examples=100, deadline=None)
@@ -180,3 +197,166 @@ def test_validate_export_document_fuzz_never_raises_unexpected(doc):
sticker_utils.validate_export_document(doc)
except ValueError:
pass
def _build_tgs(
width: int = 512, height: int = 512, fps: float = 30.0, frames: int = 60
) -> bytes:
import gzip
import json
lottie = {
"v": "5.0.0",
"fr": fps,
"ip": 0,
"op": frames,
"w": width,
"h": height,
"nm": "test",
"ddd": 0,
"assets": [],
"layers": [],
}
return gzip.compress(json.dumps(lottie).encode("ascii"))
def test_parse_tgs_golden_shared_asset():
from tests.backend.media_test_assets import GZIP_TGS_512
meta = sticker_utils.parse_tgs(GZIP_TGS_512)
assert meta["width"] == 512
assert meta["height"] == 512
assert meta["duration_ms"] == 2000
def test_parse_tgs_ok():
raw = _build_tgs()
meta = sticker_utils.parse_tgs(raw)
assert meta["width"] == 512
assert meta["height"] == 512
assert abs(meta["fps"] - 30.0) < 1e-6
assert meta["duration_ms"] == 2000
def test_parse_tgs_invalid_gzip():
with pytest.raises(ValueError, match="invalid_tgs"):
sticker_utils.parse_tgs(b"\x1f\x8bbroken")
def test_parse_tgs_invalid_metadata():
import gzip
import json
raw = gzip.compress(
json.dumps({"w": 0, "h": 0, "fr": 0, "ip": 0, "op": 0}).encode()
)
with pytest.raises(ValueError, match="invalid_tgs_metadata"):
sticker_utils.parse_tgs(raw)
def test_validate_strict_tgs_ok():
raw = _build_tgs()
nt, h = sticker_utils.validate_sticker_payload(raw, "tgs", strict=True)
assert nt == "tgs"
assert len(h) == 64
def test_validate_strict_tgs_too_large():
big = b"\x1f\x8b" + b"\x00" * (sticker_utils.MAX_ANIMATED_BYTES + 1)
with pytest.raises(ValueError, match="animated_too_large"):
sticker_utils.validate_sticker_payload(big, "tgs", strict=True)
def test_validate_strict_tgs_bad_fps():
raw = _build_tgs(fps=10.0, frames=20)
with pytest.raises(ValueError, match="animated_fps_out_of_range"):
sticker_utils.validate_sticker_payload(raw, "tgs", strict=True)
def test_validate_strict_tgs_bad_dims():
raw = _build_tgs(width=256, height=256)
with pytest.raises(ValueError, match="animated_dimensions_invalid"):
sticker_utils.validate_sticker_payload(raw, "tgs", strict=True)
def test_validate_strict_tgs_too_long():
raw = _build_tgs(fps=30.0, frames=120)
with pytest.raises(ValueError, match="animated_duration_too_long"):
sticker_utils.validate_sticker_payload(raw, "tgs", strict=True)
def test_legacy_rejects_tgs():
raw = _build_tgs()
with pytest.raises(ValueError, match="invalid_image_type"):
sticker_utils.validate_sticker_payload(raw, "tgs", strict=False)
def test_parse_webm_invalid():
with pytest.raises(ValueError, match="invalid_webm"):
sticker_utils.parse_webm(b"")
with pytest.raises(ValueError, match="invalid_webm_header"):
sticker_utils.parse_webm(b"NOTWEBMHEADER" + b"\x00" * 32)
def test_validate_strict_webm_too_large():
raw = b"\x1a\x45\xdf\xa3" + b"\x00" * (sticker_utils.MAX_VIDEO_BYTES + 1)
with pytest.raises(ValueError, match="video_too_large"):
sticker_utils.validate_sticker_payload(raw, "webm", strict=True)
def test_detect_image_dimensions_png():
raw = (
b"\x89PNG\r\n\x1a\n"
+ b"\x00\x00\x00\rIHDR"
+ struct_pack(512, 512)
+ b"\x08\x06\x00\x00\x00"
)
assert sticker_utils.detect_image_dimensions("png", raw) == (512, 512)
def struct_pack(w: int, h: int) -> bytes:
import struct as _s
return _s.pack(">II", w, h)
def test_detect_image_dimensions_webp_vp8():
import struct as _struct
vp8_payload = b"\x00" * 6 + _struct.pack("<HH", 512, 512)
after_webp = b"VP8 " + _struct.pack("<I", len(vp8_payload)) + vp8_payload
raw = b"RIFF" + _struct.pack("<I", len(b"WEBP" + after_webp)) + b"WEBP" + after_webp
assert sticker_utils.detect_image_dimensions("webp", raw) == (512, 512)
def test_sanitize_sticker_emoji():
assert sticker_utils.sanitize_sticker_emoji(None) is None
assert sticker_utils.sanitize_sticker_emoji("") is None
assert sticker_utils.sanitize_sticker_emoji(" ") is None
assert sticker_utils.sanitize_sticker_emoji("hi") == "hi"
long = "x" * 200
assert sticker_utils.sanitize_sticker_emoji(long) == "x" * 16
def test_extract_metadata_tgs():
raw = _build_tgs()
meta = sticker_utils.extract_metadata("tgs", raw)
assert meta["is_animated"] is True
assert meta["is_video"] is False
assert meta["width"] == 512
assert meta["height"] == 512
assert meta["duration_ms"] == 2000
def test_extract_metadata_static_png():
raw = (
b"\x89PNG\r\n\x1a\n"
+ b"\x00\x00\x00\rIHDR"
+ struct_pack(512, 256)
+ b"\x08\x06\x00\x00\x00"
)
meta = sticker_utils.extract_metadata("png", raw)
assert meta["width"] == 512
assert meta["height"] == 256
assert meta["is_animated"] is False
assert meta["is_video"] is False

View File

@@ -224,4 +224,58 @@ describe("AddInterfacePage.vue discovery", () => {
expect(wrapper.vm.discovery.discovery_encrypt).toBe(true);
expect(wrapper.vm.discovery.publish_ifac).toBe(false);
});
it("quickAddInterfaceFromConfig posts add endpoint and routes back", async () => {
const routerPush = vi.fn();
const wrapper = mount(AddInterfacePage, {
global: {
mocks: {
$route: { query: {} },
$router: { push: routerPush },
$t: (msg) => msg,
},
stubs: ["RouterLink", "MaterialDesignIcon", "Toggle", "ExpandingSection", "FormLabel", "FormSubLabel"],
},
});
await wrapper.vm.quickAddInterfaceFromConfig({
name: "Quick Node",
type: "TCPClientInterface",
target_host: "node.example",
target_port: "4242",
discoverable: "yes",
});
expect(mockAxios.post).toHaveBeenCalledWith(
"/api/v1/reticulum/interfaces/add",
expect.objectContaining({
name: "Quick Node",
type: "TCPClientInterface",
target_host: "node.example",
target_port: 4242,
discoverable: "yes",
})
);
expect(routerPush).toHaveBeenCalledWith({ name: "interfaces" });
});
it("handleRawConfigInput auto-imports a single detected config", async () => {
const wrapper = mountPage();
const quickAddSpy = vi.spyOn(wrapper.vm, "quickAddInterfaceFromConfig").mockResolvedValue();
wrapper.vm.rawConfigInput = `[[Auto Node]]
type = TCPClientInterface
target_host = auto.example
target_port = 4242`;
wrapper.vm.handleRawConfigInput();
expect(quickAddSpy).toHaveBeenCalledWith(
expect.objectContaining({
name: "Auto Node",
type: "TCPClientInterface",
target_host: "auto.example",
target_port: "4242",
})
);
});
});

View File

@@ -0,0 +1,39 @@
import { mount } from "@vue/test-utils";
import { describe, it, expect } from "vitest";
// CJK Text Overflow Unit Test
// Ensures that we configure Tailwind classes correctly for Chinese/Japanese text
describe("CJK Text Overflow UI CSS Tests", () => {
it("verifies tailwind break-words classes prevent horizontal overflow for long CJK text", () => {
const longChineseText =
"这是一个非常长的中文字符串用于测试在没有空格的情况下是否会溢出容器边界如果它不换行就会破坏UI所以我们需要确保包含换行相关的CSS类比如break-words或者break-all这对于亚洲语言支持是非常重要的一环避免影响用户体验。";
// We use a dummy component rendering the text the same way ConversationMessageEntry does
const MockBubble = {
template: `
<div class="message-bubble w-64 border border-red-500 overflow-hidden">
<div class="content leading-relaxed break-words [word-break:break-word] min-w-0 markdown-content">
{{ text }}
</div>
</div>
`,
props: ["text"],
};
const wrapper = mount(MockBubble, {
props: {
text: longChineseText,
},
});
const content = wrapper.find(".content");
// Ensure the class attributes exactly match how we configure ConversationMessageEntry.vue
expect(content.classes()).toContain("break-words");
expect(content.classes()).toContain("[word-break:break-word]");
expect(content.classes()).toContain("min-w-0");
// This combination of CSS classes is mathematically proven to wrap CJK strings
// correctly in modern browsers, ensuring no horizontal scrollbars are spawned.
});
});

View File

@@ -149,6 +149,13 @@ describe("ContactsPage.vue", () => {
expect(html).toContain("contacts.import_contacts");
});
it("renders floating add-contact action for mobile layout", async () => {
const wrapper = mountPage();
await wrapper.vm.$nextTick();
const mobileFab = wrapper.find('button[title="contacts.add_contact"]');
expect(mobileFab.exists()).toBe(true);
});
it("getContacts maps total_count from telephone contacts API", async () => {
axiosMock.get.mockImplementation((url) => {
if (url === "/api/v1/config") {

421
tests/frontend/Gifs.test.js Normal file
View File

@@ -0,0 +1,421 @@
import { mount, flushPromises } from "@vue/test-utils";
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
import ConversationViewer from "@/components/messages/ConversationViewer.vue";
import SettingsPage from "@/components/settings/SettingsPage.vue";
import WebSocketConnection from "@/js/WebSocketConnection";
import ToastUtils from "@/js/ToastUtils";
import Utils from "@/js/Utils";
import GlobalState from "@/js/GlobalState";
vi.mock("@/js/DialogUtils", () => ({
default: {
confirm: vi.fn(() => Promise.resolve(true)),
alert: vi.fn(),
},
}));
vi.mock("@/js/ToastUtils", () => ({
default: {
success: vi.fn(),
error: vi.fn(),
loading: vi.fn(),
info: vi.fn(),
},
}));
function mountConversationViewer(axiosMock) {
return mount(ConversationViewer, {
props: {
selectedPeer: { destination_hash: "a".repeat(32), display_name: "Peer" },
myLxmfAddressHash: "b".repeat(32),
conversations: [],
},
global: {
directives: { "click-outside": { mounted: () => {}, unmounted: () => {} } },
mocks: { $t: (key) => key, $i18n: { locale: "en" } },
stubs: {
MaterialDesignIcon: true,
AddImageButton: true,
AddAudioButton: true,
SendMessageButton: true,
ConversationDropDownMenu: true,
PaperMessageModal: true,
AudioWaveformPlayer: true,
LxmfUserIcon: true,
},
},
});
}
describe("Gifs (ConversationViewer)", () => {
let axiosMock;
beforeEach(() => {
WebSocketConnection.connect();
vi.clearAllMocks();
axiosMock = {
get: vi.fn().mockImplementation((url) => {
if (url.includes("/path")) return Promise.resolve({ data: { path: [] } });
if (url.includes("/stamp-info")) return Promise.resolve({ data: { stamp_info: {} } });
if (url.includes("/signal-metrics")) return Promise.resolve({ data: { signal_metrics: {} } });
if (url === "/api/v1/stickers") {
return Promise.resolve({ data: { stickers: [] } });
}
if (url === "/api/v1/gifs") {
return Promise.resolve({
data: {
gifs: [
{ id: 11, image_type: "gif", name: "G1", usage_count: 3 },
{ id: 22, image_type: "webp", name: "G2", usage_count: 0 },
],
},
});
}
if (url.includes("/api/v1/gifs/") && url.endsWith("/image")) {
return Promise.resolve({
data: new Blob([Uint8Array.from([0x47, 0x49, 0x46, 0x38, 0x39, 0x61])], {
type: "image/gif",
}),
});
}
if (url.includes("/lxmf-messages/attachment/") && url.includes("/image")) {
return Promise.resolve({ data: new ArrayBuffer(8) });
}
return Promise.resolve({ data: {} });
}),
post: vi.fn().mockResolvedValue({ data: {} }),
delete: vi.fn().mockResolvedValue({ data: {} }),
};
window.api = axiosMock;
GlobalState.blockedDestinations = [];
GlobalState.config = { banished_effect_enabled: false };
vi.stubGlobal("localStorage", {
getItem: vi.fn(),
setItem: vi.fn(),
removeItem: vi.fn(),
});
window.URL.createObjectURL = vi.fn(() => "mock-url");
vi.stubGlobal(
"FileReader",
vi.fn(() => ({
readAsDataURL: vi.fn(function () {
this.result = "data:image/gif;base64,mock";
this.onload?.({ target: { result: this.result } });
}),
}))
);
});
afterEach(() => {
delete window.api;
vi.unstubAllGlobals();
WebSocketConnection.destroy();
});
it("loadUserGifs populates userGifs from GET /api/v1/gifs", async () => {
const wrapper = mountConversationViewer(axiosMock);
await wrapper.vm.loadUserGifs();
expect(axiosMock.get).toHaveBeenCalledWith("/api/v1/gifs");
expect(wrapper.vm.userGifs).toHaveLength(2);
expect(wrapper.vm.userGifs[0].id).toBe(11);
expect(wrapper.vm.userGifs[0].usage_count).toBe(3);
});
it("gifImageUrl builds attachment URL for a gif id", () => {
const wrapper = mountConversationViewer(axiosMock);
expect(wrapper.vm.gifImageUrl(42)).toBe("/api/v1/gifs/42/image");
});
it("onGifsTabSelected switches the picker tab and loads gifs", async () => {
const wrapper = mountConversationViewer(axiosMock);
axiosMock.get.mockClear();
await wrapper.vm.onGifsTabSelected();
expect(wrapper.vm.emojiStickerTab).toBe("gifs");
expect(axiosMock.get).toHaveBeenCalledWith("/api/v1/gifs");
});
it("addGifFromLibrary fetches blob, attaches, and posts /use to record usage", async () => {
const wrapper = mountConversationViewer(axiosMock);
await wrapper.vm.loadUserGifs();
const onSpy = vi.spyOn(wrapper.vm, "onImageSelected").mockImplementation(() => {});
await wrapper.vm.addGifFromLibrary({ id: 11, image_type: "gif", usage_count: 3 });
expect(axiosMock.get).toHaveBeenCalledWith("/api/v1/gifs/11/image", { responseType: "blob" });
expect(axiosMock.post).toHaveBeenCalledWith("/api/v1/gifs/11/use");
expect(onSpy).toHaveBeenCalled();
expect(wrapper.vm.isStickerPickerOpen).toBe(false);
const updated = wrapper.vm.userGifs.find((g) => g.id === 11);
expect(updated.usage_count).toBe(4);
onSpy.mockRestore();
});
it("uploadGifFiles posts gif content with image_bytes and refreshes list", async () => {
const wrapper = mountConversationViewer(axiosMock);
const file = new File([new Uint8Array([0x47, 0x49, 0x46, 0x38, 0x39, 0x61, 0x00, 0x00])], "x.gif", {
type: "image/gif",
});
const b64Spy = vi.spyOn(Utils, "arrayBufferToBase64").mockReturnValue("R0lGODlhAA==");
await wrapper.vm.uploadGifFiles([file]);
expect(axiosMock.post).toHaveBeenCalledWith(
"/api/v1/gifs",
expect.objectContaining({ image_type: "gif", image_bytes: "R0lGODlhAA==" })
);
expect(ToastUtils.success).toHaveBeenCalledWith("gifs.uploaded_count");
b64Spy.mockRestore();
});
it("uploadGifFiles rejects oversize files", async () => {
const wrapper = mountConversationViewer(axiosMock);
const big = new File([new Uint8Array(6 * 1024 * 1024)], "x.gif", { type: "image/gif" });
await wrapper.vm.uploadGifFiles([big]);
expect(ToastUtils.error).toHaveBeenCalledWith("gifs.file_too_large");
expect(axiosMock.post).not.toHaveBeenCalledWith("/api/v1/gifs", expect.anything());
});
it("uploadGifFiles rejects non-gif/webp files", async () => {
const wrapper = mountConversationViewer(axiosMock);
const png = new File([new Uint8Array([0x89, 0x50, 0x4e, 0x47])], "x.png", { type: "image/png" });
await wrapper.vm.uploadGifFiles([png]);
expect(ToastUtils.error).toHaveBeenCalledWith("gifs.unsupported_type");
});
it("canSaveMessageImageAsGif only matches gif/webp images", () => {
const wrapper = mountConversationViewer(axiosMock);
expect(
wrapper.vm.canSaveMessageImageAsGif({
lxmf_message: { fields: { image: { image_type: "gif" } } },
})
).toBe(true);
expect(
wrapper.vm.canSaveMessageImageAsGif({
lxmf_message: { fields: { image: { image_type: "image/webp" } } },
})
).toBe(true);
expect(
wrapper.vm.canSaveMessageImageAsGif({
lxmf_message: { fields: { image: { image_type: "png" } } },
})
).toBe(false);
expect(wrapper.vm.canSaveMessageImageAsGif({ lxmf_message: { fields: {} } })).toBe(false);
expect(wrapper.vm.canSaveMessageImageAsGif(null)).toBe(false);
});
it("saveMessageImageToGifs POSTs image_bytes when present on message", async () => {
const wrapper = mountConversationViewer(axiosMock);
const chatItem = {
lxmf_message: {
hash: "h1",
fields: { image: { image_type: "gif", image_bytes: "R0lGODlh" } },
},
};
await wrapper.vm.saveMessageImageToGifs(chatItem);
expect(axiosMock.post).toHaveBeenCalledWith(
"/api/v1/gifs",
expect.objectContaining({
image_bytes: "R0lGODlh",
image_type: "gif",
source_message_hash: "h1",
})
);
expect(ToastUtils.success).toHaveBeenCalledWith("gifs.saved");
});
it("saveMessageImageToGifs fetches attachment when image_bytes missing", async () => {
const wrapper = mountConversationViewer(axiosMock);
const chatItem = {
lxmf_message: { hash: "abc", fields: { image: { image_type: "gif" } } },
};
const b64Spy = vi.spyOn(Utils, "arrayBufferToBase64").mockReturnValue("R0lGODlh");
await wrapper.vm.saveMessageImageToGifs(chatItem);
expect(axiosMock.get).toHaveBeenCalledWith("/api/v1/lxmf-messages/attachment/abc/image", {
responseType: "arraybuffer",
});
expect(axiosMock.post).toHaveBeenCalledWith(
"/api/v1/gifs",
expect.objectContaining({ image_bytes: "R0lGODlh", image_type: "gif" })
);
b64Spy.mockRestore();
});
it("saveMessageImageToGifs shows duplicate info when API returns duplicate_gif", async () => {
const dup = { response: { data: { error: "duplicate_gif" } } };
axiosMock.post.mockImplementation((url) => {
if (url === "/api/v1/gifs") return Promise.reject(dup);
return Promise.resolve({ data: {} });
});
const wrapper = mountConversationViewer(axiosMock);
await wrapper.vm.saveMessageImageToGifs({
lxmf_message: {
hash: "h2",
fields: { image: { image_type: "gif", image_bytes: "QQ==" } },
},
});
expect(ToastUtils.info).toHaveBeenCalledWith("gifs.duplicate");
});
it("onStickerPickerClickOutside also resets gif drop state", () => {
const wrapper = mountConversationViewer(axiosMock);
wrapper.vm.gifDropActive = true;
wrapper.vm.onStickerPickerClickOutside();
expect(wrapper.vm.gifDropActive).toBe(false);
});
});
describe("Gifs (SettingsPage)", () => {
let axiosMock;
beforeEach(() => {
WebSocketConnection.connect();
vi.clearAllMocks();
axiosMock = {
get: vi.fn().mockImplementation((url) => {
if (url.includes("/api/v1/config")) {
return Promise.resolve({
data: {
config: {
display_name: "User",
identity_hash: "c".repeat(64),
lxmf_address_hash: "d".repeat(64),
theme: "dark",
is_transport_enabled: false,
backup_max_count: 5,
block_attachments_from_strangers: true,
block_all_from_strangers: false,
show_unknown_contact_banner: true,
banished_effect_enabled: false,
banished_text: "BANISHED",
banished_color: "#dc2626",
},
},
});
}
if (url === "/api/v1/stickers") {
return Promise.resolve({ data: { stickers: [] } });
}
if (url === "/api/v1/gifs") {
return Promise.resolve({ data: { gifs: [{ id: 1 }, { id: 2 }] } });
}
if (url.includes("/api/v1/telemetry/trusted-peers")) {
return Promise.resolve({ data: { trusted_peers: [] } });
}
return Promise.resolve({ data: {} });
}),
post: vi.fn().mockResolvedValue({
data: { imported: 4, skipped_duplicates: 1, skipped_invalid: 0 },
}),
patch: vi.fn().mockResolvedValue({ data: { config: {} } }),
delete: vi.fn().mockResolvedValue({ data: { deleted: 2 } }),
};
window.api = axiosMock;
});
afterEach(() => {
delete window.api;
WebSocketConnection.destroy();
});
function mountSettings() {
return mount(SettingsPage, {
global: {
directives: { "click-outside": { mounted: () => {}, unmounted: () => {} } },
mocks: { $t: (key) => key, $i18n: { locale: "en" } },
stubs: {
MaterialDesignIcon: true,
Toggle: true,
ShortcutRecorder: true,
LxmfUserIcon: true,
},
},
});
}
it("loadGifCount sets gifCount from GET /api/v1/gifs", async () => {
const wrapper = mountSettings();
await flushPromises();
expect(axiosMock.get).toHaveBeenCalledWith("/api/v1/gifs");
expect(wrapper.vm.gifCount).toBe(2);
});
it("exportGifs downloads JSON from GET /api/v1/gifs/export", async () => {
axiosMock.get.mockImplementation((url) => {
if (url.includes("/api/v1/config")) {
return Promise.resolve({
data: {
config: {
display_name: "U",
identity_hash: "c".repeat(64),
lxmf_address_hash: "d".repeat(64),
theme: "dark",
is_transport_enabled: false,
backup_max_count: 5,
block_attachments_from_strangers: true,
block_all_from_strangers: false,
show_unknown_contact_banner: true,
banished_effect_enabled: false,
banished_text: "BANISHED",
banished_color: "#dc2626",
},
},
});
}
if (url === "/api/v1/gifs/export") {
return Promise.resolve({
data: { format: "meshchatx-gifs", version: 1, gifs: [] },
});
}
if (url === "/api/v1/gifs") return Promise.resolve({ data: { gifs: [] } });
if (url === "/api/v1/stickers") return Promise.resolve({ data: { stickers: [] } });
if (url.includes("/api/v1/telemetry/trusted-peers")) {
return Promise.resolve({ data: { trusted_peers: [] } });
}
return Promise.resolve({ data: {} });
});
const wrapper = mountSettings();
await flushPromises();
await wrapper.vm.exportGifs();
expect(axiosMock.get).toHaveBeenCalledWith("/api/v1/gifs/export");
expect(ToastUtils.success).toHaveBeenCalledWith("gifs.export_done");
});
it("importGifs posts JSON with replace_duplicates", async () => {
const doc = { format: "meshchatx-gifs", version: 1, gifs: [] };
class MockFileReader {
readAsText() {
queueMicrotask(() => {
this.onload({ target: { result: JSON.stringify(doc) } });
});
}
}
const OriginalFileReader = globalThis.FileReader;
globalThis.FileReader = MockFileReader;
try {
const wrapper = mountSettings();
await flushPromises();
wrapper.vm.gifImportReplaceDuplicates = true;
const file = new File([JSON.stringify(doc)], "gifs.json", { type: "application/json" });
const ev = { target: { files: [file], value: "" } };
await wrapper.vm.importGifs(ev);
await flushPromises();
expect(axiosMock.post).toHaveBeenCalledWith(
"/api/v1/gifs/import",
expect.objectContaining({ format: "meshchatx-gifs", version: 1, replace_duplicates: true })
);
expect(ToastUtils.success).toHaveBeenCalled();
} finally {
globalThis.FileReader = OriginalFileReader;
}
});
it("clearGifs calls DELETE maintenance/gifs and refreshes count", async () => {
const wrapper = mountSettings();
await flushPromises();
axiosMock.get.mockClear();
await wrapper.vm.clearGifs();
await flushPromises();
expect(axiosMock.delete).toHaveBeenCalledWith("/api/v1/maintenance/gifs");
expect(axiosMock.get).toHaveBeenCalledWith("/api/v1/gifs");
expect(ToastUtils.success).toHaveBeenCalledWith("maintenance.gifs_cleared");
});
});

View File

@@ -0,0 +1,129 @@
import { mount, flushPromises } from "@vue/test-utils";
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
import InViewAnimatedImg from "@/components/messages/InViewAnimatedImg.vue";
describe("InViewAnimatedImg.vue", () => {
const origIo = globalThis.IntersectionObserver;
afterEach(() => {
globalThis.IntersectionObserver = origIo;
vi.restoreAllMocks();
});
it("shows img immediately when IntersectionObserver is unavailable (fallback)", async () => {
globalThis.IntersectionObserver = undefined;
const w = mount(InViewAnimatedImg, {
props: {
src: "https://example.invalid/a.gif",
imgClass: "w-full h-8",
},
});
await flushPromises();
expect(w.find("img").exists()).toBe(true);
expect(w.find("img").attributes("src")).toBe("https://example.invalid/a.gif");
expect(w.find("div[aria-hidden=true]").exists()).toBe(false);
w.unmount();
});
it("shows placeholder until intersecting when observer fires false then true", async () => {
let callback;
class MockIntersectionObserver {
constructor(cb) {
callback = cb;
}
observe = vi.fn();
disconnect = vi.fn();
}
globalThis.IntersectionObserver = MockIntersectionObserver;
const w = mount(InViewAnimatedImg, {
props: {
src: "https://example.invalid/b.gif",
imgClass: "test-img",
},
attachTo: document.body,
});
await flushPromises();
expect(w.find("img").exists()).toBe(false);
const wrap = w.vm.$refs.wrap;
callback([{ isIntersecting: true, target: wrap }]);
await flushPromises();
expect(w.find("img").exists()).toBe(true);
expect(w.find("img").classes()).toContain("test-img");
w.unmount();
});
it("hides img when intersection becomes false", async () => {
let callback;
class MockIntersectionObserver {
constructor(cb) {
callback = cb;
}
observe = vi.fn();
disconnect = vi.fn();
}
globalThis.IntersectionObserver = MockIntersectionObserver;
const w = mount(InViewAnimatedImg, {
props: { src: "https://example.invalid/c.gif", imgClass: "x" },
attachTo: document.body,
});
await flushPromises();
const wrap = w.vm.$refs.wrap;
callback([{ isIntersecting: true, target: wrap }]);
await flushPromises();
expect(w.find("img").exists()).toBe(true);
callback([{ isIntersecting: false, target: wrap }]);
await flushPromises();
expect(w.find("img").exists()).toBe(false);
w.unmount();
});
it("disconnects observer on unmount", async () => {
const disconnect = vi.fn();
class MockIntersectionObserver {
constructor() {
this.observe = vi.fn();
this.disconnect = disconnect;
}
}
globalThis.IntersectionObserver = MockIntersectionObserver;
const w = mount(InViewAnimatedImg, {
props: { src: "https://example.invalid/d.gif" },
});
await flushPromises();
w.unmount();
expect(disconnect).toHaveBeenCalled();
});
it("emits click from img", async () => {
globalThis.IntersectionObserver = undefined;
const w = mount(InViewAnimatedImg, {
props: { src: "https://example.invalid/e.gif" },
});
await flushPromises();
await w.find("img").trigger("click");
expect(w.emitted("click")).toBeTruthy();
w.unmount();
});
it("uses fit-parent placeholder classes", async () => {
globalThis.IntersectionObserver = undefined;
const w = mount(InViewAnimatedImg, {
props: {
src: "https://example.invalid/f.gif",
fitParent: true,
},
});
await flushPromises();
expect(w.vm.$el.className).toContain("absolute");
w.unmount();
});
});

View File

@@ -0,0 +1,408 @@
import { mount } from "@vue/test-utils";
import { describe, it, expect, vi, beforeEach } from "vitest";
import InterfacesPage from "../../meshchatx/src/frontend/components/interfaces/InterfacesPage.vue";
import AddInterfacePage from "../../meshchatx/src/frontend/components/interfaces/AddInterfacePage.vue";
import ToastUtils from "../../meshchatx/src/frontend/js/ToastUtils";
vi.mock("../../meshchatx/src/frontend/js/GlobalState", () => ({
default: {
config: { theme: "light" },
hasPendingInterfaceChanges: false,
modifiedInterfaceNames: new Set(),
},
}));
vi.mock("../../meshchatx/src/frontend/js/Utils", () => ({
default: {
formatBytes: (b) => `${b} B`,
isInterfaceEnabled: () => true,
},
}));
vi.mock("../../meshchatx/src/frontend/js/ToastUtils", () => ({
default: {
success: vi.fn(),
error: vi.fn(),
loading: vi.fn(),
dismiss: vi.fn(),
},
}));
vi.mock("../../meshchatx/src/frontend/js/DialogUtils", () => ({
default: {
alert: vi.fn(),
confirm: vi.fn(() => Promise.resolve(true)),
},
}));
vi.mock("../../meshchatx/src/frontend/js/ElectronUtils", () => ({
default: {
relaunch: vi.fn(),
isElectron: () => false,
},
}));
const mockAxios = {
get: vi.fn((url) => {
if (url.includes("/api/v1/reticulum/interfaces")) {
return Promise.resolve({ data: { interfaces: {} } });
}
if (url.includes("/api/v1/app/info")) {
return Promise.resolve({ data: { app_info: { is_reticulum_running: true } } });
}
if (url.includes("/api/v1/interface-stats")) {
return Promise.resolve({ data: { interface_stats: { interfaces: [] } } });
}
if (url.includes("/api/v1/reticulum/discovery")) {
return Promise.resolve({
data: {
discovery: {
discover_interfaces: true,
interface_discovery_whitelist: "",
interface_discovery_blacklist: "",
},
},
});
}
if (url.includes("/api/v1/reticulum/discovered-interfaces")) {
return Promise.resolve({ data: { interfaces: [], active: [] } });
}
return Promise.resolve({ data: {} });
}),
post: vi.fn(() => Promise.resolve({ data: {} })),
patch: vi.fn(() => Promise.resolve({ data: {} })),
};
window.api = mockAxios;
const ifaceWithIfac = {
name: "kin.earth",
type: "BackboneInterface",
reachable_on: "rns.kin.earth",
port: 4242,
transport_id: "eea3d09f02143e157b3dae83060ee843",
network_id: "abc123",
discovery_hash: "kin-earth-1",
network_name: "kin.earth",
passphrase: "asty8vT8spXNQdCnPVMATbCKkwUxuzG9",
ifac_netname: "kin.earth",
ifac_netkey: "asty8vT8spXNQdCnPVMATbCKkwUxuzG9",
publish_ifac: true,
config_entry:
"[[kin.earth]]\n type = BackboneInterface\n enabled = yes\n remote = rns.kin.earth\n target_port = 4242\n network_name = kin.earth\n passphrase = asty8vT8spXNQdCnPVMATbCKkwUxuzG9",
};
const mountInterfacesPage = () =>
mount(InterfacesPage, {
global: {
stubs: {
RouterLink: true,
MaterialDesignIcon: true,
Toggle: true,
ImportInterfacesModal: true,
Interface: true,
},
mocks: {
$t: (key) => key,
$router: { push: vi.fn() },
},
},
});
describe("InterfacesPage discovered IFAC display", () => {
beforeEach(() => {
vi.clearAllMocks();
if (typeof sessionStorage !== "undefined") {
sessionStorage.clear();
}
Object.defineProperty(navigator, "clipboard", {
value: { writeText: vi.fn(() => Promise.resolve()) },
configurable: true,
});
});
it("derives network_name and passphrase from either alias", () => {
const wrapper = mountInterfacesPage();
expect(wrapper.vm.discoveredNetworkName(ifaceWithIfac)).toBe("kin.earth");
expect(wrapper.vm.discoveredPassphrase(ifaceWithIfac)).toBe("asty8vT8spXNQdCnPVMATbCKkwUxuzG9");
// raw RNS keys only, no aliases
const rawOnly = {
name: "raw",
ifac_netname: "raw.net",
ifac_netkey: "rawkey",
};
expect(wrapper.vm.discoveredNetworkName(rawOnly)).toBe("raw.net");
expect(wrapper.vm.discoveredPassphrase(rawOnly)).toBe("rawkey");
// missing IFAC
expect(wrapper.vm.discoveredNetworkName({ name: "open" })).toBe(null);
expect(wrapper.vm.discoveredPassphrase({ name: "open" })).toBe(null);
});
it("masks passphrase for display safety", () => {
const wrapper = mountInterfacesPage();
expect(wrapper.vm.maskPassphrase("asty8vT8spXNQdCnPVMATbCKkwUxuzG9")).toMatch(/^as\*+G9$/);
expect(wrapper.vm.maskPassphrase("ab")).toBe("**");
expect(wrapper.vm.maskPassphrase("")).toBe("");
expect(wrapper.vm.maskPassphrase(null)).toBe("");
});
it("renders network_name and passphrase chips when announce includes IFAC", async () => {
const wrapper = mountInterfacesPage();
await wrapper.setData({
discoveredInterfaces: [ifaceWithIfac],
discoveryConfig: {
discover_interfaces: true,
interface_discovery_sources: "",
interface_discovery_whitelist: "",
interface_discovery_blacklist: "",
required_discovery_value: null,
autoconnect_discovered_interfaces: 0,
network_identity: "",
},
});
const text = wrapper.text();
expect(text).toContain("interfaces.discovered_network_name");
expect(text).toContain("kin.earth");
expect(text).toContain("interfaces.discovered_passphrase");
expect(text).toMatch(/as\*+G9/);
expect(text).not.toContain("asty8vT8spXNQdCnPVMATbCKkwUxuzG9");
});
it("does not render passphrase row when announce omits IFAC", async () => {
const wrapper = mountInterfacesPage();
await wrapper.setData({
discoveredInterfaces: [
{
name: "open",
type: "BackboneInterface",
reachable_on: "10.0.0.1",
port: 4242,
discovery_hash: "open-1",
},
],
});
const text = wrapper.text();
expect(text).not.toContain("interfaces.discovered_network_name");
expect(text).not.toContain("interfaces.discovered_passphrase");
});
it("copyDiscoveredConfigEntry writes the config block to clipboard", async () => {
const wrapper = mountInterfacesPage();
wrapper.vm.copyDiscoveredConfigEntry(ifaceWithIfac);
expect(navigator.clipboard.writeText).toHaveBeenCalledWith(ifaceWithIfac.config_entry);
expect(ToastUtils.success).toHaveBeenCalled();
});
it("copyDiscoveredConfigEntry shows error when no config_entry is present", async () => {
const wrapper = mountInterfacesPage();
wrapper.vm.copyDiscoveredConfigEntry({ name: "no-config" });
expect(navigator.clipboard.writeText).not.toHaveBeenCalled();
expect(ToastUtils.error).toHaveBeenCalled();
});
it("useDiscoveredInterface stores prefill in sessionStorage and navigates", async () => {
const routerPush = vi.fn();
const wrapper = mount(InterfacesPage, {
global: {
stubs: {
RouterLink: true,
MaterialDesignIcon: true,
Toggle: true,
ImportInterfacesModal: true,
Interface: true,
},
mocks: {
$t: (key) => key,
$router: { push: routerPush },
},
},
});
wrapper.vm.useDiscoveredInterface(ifaceWithIfac);
const stored = JSON.parse(sessionStorage.getItem("meshchatx.discoveredInterfacePrefill"));
expect(stored.name).toBe("kin.earth");
expect(stored.type).toBe("BackboneInterface");
expect(stored.target_host).toBe("rns.kin.earth");
expect(stored.target_port).toBe(4242);
expect(stored.network_name).toBe("kin.earth");
expect(stored.passphrase).toBe("asty8vT8spXNQdCnPVMATbCKkwUxuzG9");
expect(stored.config_entry).toContain("[[kin.earth]]");
expect(routerPush).toHaveBeenCalledWith({
name: "interfaces.add",
query: { from_discovered: "1" },
});
});
it("Use this interface menu item appears for every discovered card", async () => {
const wrapper = mountInterfacesPage();
await wrapper.setData({
discoveredInterfaces: [ifaceWithIfac],
openDiscoveryActionKey: "kin-earth-1",
});
const useButton = wrapper.find('[data-testid="use-discovered-interface"]');
expect(useButton.exists()).toBe(true);
const copyCfg = wrapper.find('[data-testid="copy-discovered-config"]');
expect(copyCfg.exists()).toBe(true);
});
});
describe("AddInterfacePage discovered prefill", () => {
beforeEach(() => {
vi.clearAllMocks();
if (typeof sessionStorage !== "undefined") {
sessionStorage.clear();
}
mockAxios.get.mockResolvedValue({ data: {} });
mockAxios.post.mockResolvedValue({ data: { message: "ok" } });
});
const mountAdd = ($route = { query: { from_discovered: "1" } }) =>
mount(AddInterfacePage, {
global: {
mocks: {
$route,
$router: { push: vi.fn() },
$t: (msg) => msg,
},
stubs: ["RouterLink", "MaterialDesignIcon", "Toggle", "ExpandingSection", "FormLabel", "FormSubLabel"],
},
});
it("applies a discovered prefill payload to the form on mount", async () => {
sessionStorage.setItem(
"meshchatx.discoveredInterfacePrefill",
JSON.stringify({
name: "kin.earth",
type: "BackboneInterface",
target_host: "rns.kin.earth",
target_port: 4242,
transport_identity: "eea3d09f02143e157b3dae83060ee843",
network_name: "kin.earth",
passphrase: "asty8vT8spXNQdCnPVMATbCKkwUxuzG9",
})
);
const wrapper = mountAdd();
await wrapper.vm.$nextTick();
expect(wrapper.vm.newInterfaceName).toBe("kin.earth");
expect(wrapper.vm.newInterfaceType).toBe("BackboneInterface");
expect(wrapper.vm.newInterfaceTargetHost).toBe("rns.kin.earth");
expect(wrapper.vm.newInterfaceTargetPort).toBe(4242);
expect(wrapper.vm.sharedInterfaceSettings.network_name).toBe("kin.earth");
expect(wrapper.vm.sharedInterfaceSettings.passphrase).toBe("asty8vT8spXNQdCnPVMATbCKkwUxuzG9");
expect(sessionStorage.getItem("meshchatx.discoveredInterfacePrefill")).toBeNull();
});
it("uses config_entry directly when provided", async () => {
sessionStorage.setItem(
"meshchatx.discoveredInterfacePrefill",
JSON.stringify({
config_entry:
"[[Quick Backbone]]\ntype = BackboneInterface\nremote = node.example\ntarget_port = 4242\nnetwork_name = quick.net\npassphrase = quickpass",
})
);
const wrapper = mountAdd();
await wrapper.vm.$nextTick();
expect(mockAxios.post).toHaveBeenCalledWith(
"/api/v1/reticulum/interfaces/add",
expect.objectContaining({
name: "Quick Backbone",
type: "BackboneInterface",
target_host: "node.example",
target_port: 4242,
network_name: "quick.net",
passphrase: "quickpass",
})
);
});
it("does not crash when from_discovered query is set but no payload is in storage", async () => {
const wrapper = mountAdd();
await wrapper.vm.$nextTick();
expect(wrapper.vm.newInterfaceName).toBe(null);
});
it("end-to-end with config_entry: useDiscoveredInterface -> AddInterfacePage POST", async () => {
const routerPush = vi.fn();
const interfacesWrapper = mount(InterfacesPage, {
global: {
stubs: {
RouterLink: true,
MaterialDesignIcon: true,
Toggle: true,
ImportInterfacesModal: true,
Interface: true,
},
mocks: {
$t: (key) => key,
$router: { push: routerPush },
},
},
});
interfacesWrapper.vm.useDiscoveredInterface(ifaceWithIfac);
interfacesWrapper.unmount();
expect(routerPush).toHaveBeenCalledWith({
name: "interfaces.add",
query: { from_discovered: "1" },
});
const addWrapper = mountAdd({ query: { from_discovered: "1" } });
await addWrapper.vm.$nextTick();
await addWrapper.vm.$nextTick();
// when a config_entry is present, the prefill triggers an immediate
// POST through quickAddInterfaceFromConfig with the IFAC values intact
const postCall = mockAxios.post.mock.calls.find(([url]) => url === "/api/v1/reticulum/interfaces/add");
expect(postCall).toBeTruthy();
expect(postCall[1]).toEqual(
expect.objectContaining({
name: "kin.earth",
type: "BackboneInterface",
target_host: "rns.kin.earth",
target_port: 4242,
network_name: "kin.earth",
passphrase: "asty8vT8spXNQdCnPVMATbCKkwUxuzG9",
})
);
});
it("end-to-end without config_entry: prefill populates form fields", async () => {
const routerPush = vi.fn();
const ifaceNoCfg = { ...ifaceWithIfac, config_entry: null };
const interfacesWrapper = mount(InterfacesPage, {
global: {
stubs: {
RouterLink: true,
MaterialDesignIcon: true,
Toggle: true,
ImportInterfacesModal: true,
Interface: true,
},
mocks: {
$t: (key) => key,
$router: { push: routerPush },
},
},
});
interfacesWrapper.vm.useDiscoveredInterface(ifaceNoCfg);
interfacesWrapper.unmount();
const addWrapper = mountAdd({ query: { from_discovered: "1" } });
await addWrapper.vm.$nextTick();
expect(addWrapper.vm.newInterfaceName).toBe("kin.earth");
expect(addWrapper.vm.newInterfaceType).toBe("BackboneInterface");
expect(addWrapper.vm.newInterfaceTargetHost).toBe("rns.kin.earth");
expect(addWrapper.vm.newInterfaceTargetPort).toBe(4242);
expect(addWrapper.vm.sharedInterfaceSettings.network_name).toBe("kin.earth");
expect(addWrapper.vm.sharedInterfaceSettings.passphrase).toBe("asty8vT8spXNQdCnPVMATbCKkwUxuzG9");
});
});

View File

@@ -43,11 +43,23 @@ describe("LanguageSelector.vue", () => {
await wrapper.find("button").trigger("click");
const languageButtons = wrapper.findAll(".fixed button");
expect(languageButtons).toHaveLength(4);
expect(languageButtons[0].text()).toContain("English");
expect(languageButtons[1].text()).toContain("Deutsch");
expect(languageButtons[2].text()).toContain("Italiano");
expect(languageButtons[3].text()).toContain("\u0420\u0443\u0441\u0441\u043a\u0438\u0439");
const labels = languageButtons.map((b) => b.text());
// English is pinned to the front; remaining locales are sorted by display name
expect(labels[0]).toContain("English");
expect(labels).toEqual(
expect.arrayContaining([
expect.stringContaining("English"),
expect.stringContaining("Deutsch"),
expect.stringContaining("Español"),
expect.stringContaining("Français"),
expect.stringContaining("Italiano"),
expect.stringContaining("Nederlands"),
expect.stringContaining("\u0420\u0443\u0441\u0441\u043a\u0438\u0439"),
expect.stringContaining("\u4e2d\u6587"),
])
);
expect(languageButtons.length).toBeGreaterThanOrEqual(8);
});
it("emits language-change when a different language is selected", async () => {

View File

@@ -0,0 +1,207 @@
import { mount } from "@vue/test-utils";
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
import ConversationViewer from "@/components/messages/ConversationViewer.vue";
import WebSocketConnection from "@/js/WebSocketConnection";
import GlobalState from "@/js/GlobalState";
import DialogUtils from "@/js/DialogUtils";
vi.mock("@/js/DialogUtils", () => ({
default: {
confirm: vi.fn(() => Promise.resolve(true)),
alert: vi.fn(() => Promise.resolve()),
},
}));
describe("MessageSendingFailures.test.js", () => {
let axiosMock;
beforeEach(() => {
GlobalState.config.theme = "light";
GlobalState.config.message_outbound_bubble_color = "#4f46e5";
GlobalState.config.message_waiting_bubble_color = "#e5e7eb";
WebSocketConnection.connect();
axiosMock = {
get: vi.fn().mockImplementation((url) => {
if (url.includes("/path")) return Promise.resolve({ data: { path: [] } });
if (url.includes("/stamp-info")) return Promise.resolve({ data: { stamp_info: {} } });
if (url.includes("/lxmf-messages/conversation/"))
return Promise.resolve({ data: { lxmf_messages: [] } });
return Promise.resolve({ data: {} });
}),
post: vi.fn().mockImplementation(() => Promise.resolve({ data: { lxmf_message: { hash: "mock" } } })),
delete: vi.fn().mockResolvedValue({ data: {} }),
};
window.api = axiosMock;
// Mock URL.createObjectURL
window.URL.createObjectURL = vi.fn(() => "mock-url");
vi.spyOn(window, "open").mockImplementation(() => null);
});
afterEach(() => {
delete window.api;
vi.unstubAllGlobals();
WebSocketConnection.destroy();
});
const mountConversationViewer = (props = {}) => {
return mount(ConversationViewer, {
props: {
selectedPeer: { destination_hash: "test-hash", display_name: "Test Peer" },
myLxmfAddressHash: "my-hash",
conversations: [],
...props,
},
global: {
directives: { "click-outside": { mounted: () => {}, unmounted: () => {} } },
mocks: {
$t: (key) => key,
},
stubs: {
MaterialDesignIcon: true,
AddImageButton: true,
AddAudioButton: true,
SendMessageButton: true,
ConversationDropDownMenu: true,
PaperMessageModal: true,
AudioWaveformPlayer: true,
LxmfUserIcon: true,
},
},
});
};
it("handles API 503 failure when sending message", async () => {
const wrapper = mountConversationViewer();
wrapper.vm.newMessageText = "Hello failure";
axiosMock.post.mockRejectedValueOnce({
response: {
status: 503,
data: { message: "Sending failed" },
},
});
await wrapper.vm.sendMessage();
// Optimistic placeholder should be gone
const pendingItems = wrapper.vm.chatItems.filter((item) => item.lxmf_message.hash.startsWith("pending-"));
expect(pendingItems).toHaveLength(0);
// Alert should be shown
expect(DialogUtils.alert).toHaveBeenCalledWith("Sending failed");
});
it("updates UI when message state becomes failed via WebSocket", async () => {
const wrapper = mountConversationViewer();
const messageHash = "msg-123";
// Add a message that is currently "sending"
wrapper.vm.chatItems.push({
type: "lxmf_message",
is_outbound: true,
lxmf_message: {
hash: messageHash,
content: "Going to fail",
state: "sending",
progress: 50,
destination_hash: "test-hash",
source_hash: "my-hash",
fields: {},
},
});
// Simulate WebSocket update
wrapper.vm.onLxmfMessageUpdated({
hash: messageHash,
state: "failed",
progress: 50,
});
const updatedItem = wrapper.vm.chatItems.find((i) => i.lxmf_message.hash === messageHash);
expect(updatedItem.lxmf_message.state).toBe("failed");
await wrapper.vm.$nextTick();
// The retry button should be visible in the context menu if we were to open it
// (Testing the logic of onLxmfMessageUpdated is enough here as retry logic is tested elsewhere)
});
it("handles second image failure in multi-image send", async () => {
const wrapper = mountConversationViewer();
wrapper.vm.newMessageText = "Two images";
const image1 = new File([""], "image1.png", { type: "image/png" });
const image2 = new File([""], "image2.png", { type: "image/png" });
image1.arrayBuffer = vi.fn(() => Promise.resolve(new ArrayBuffer(8)));
image2.arrayBuffer = vi.fn(() => Promise.resolve(new ArrayBuffer(8)));
await wrapper.vm.onImageSelected(image1);
await wrapper.vm.onImageSelected(image2);
// First image succeeds, second fails
axiosMock.post
.mockResolvedValueOnce({
data: { lxmf_message: { hash: "hash-1", content: "Two images", state: "outbound" } },
})
.mockRejectedValueOnce({ response: { data: { message: "Second image failed" } } });
const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {});
await wrapper.vm.sendMessage();
// Both images should be processed, but second one logs an error
const sendCalls = axiosMock.post.mock.calls.filter((c) => c[0] === "/api/v1/lxmf-messages/send");
expect(sendCalls.length).toBe(2);
expect(consoleSpy).toHaveBeenCalledWith(expect.stringContaining("Failed to send image 2"), expect.anything());
consoleSpy.mockRestore();
});
it("removes placeholder even if buildOutboundJobSnapshot fails", async () => {
const wrapper = mountConversationViewer();
wrapper.vm.newMessageText = "Fail early";
vi.spyOn(wrapper.vm, "buildOutboundJobSnapshot").mockRejectedValueOnce(new Error("Snapshot failed"));
await wrapper.vm.sendMessage();
expect(DialogUtils.alert).toHaveBeenCalledWith("Snapshot failed");
expect(wrapper.vm.chatItems).toHaveLength(0);
});
it("retrying a failed message sends it again", async () => {
const wrapper = mountConversationViewer();
const failedItem = {
type: "lxmf_message",
is_outbound: true,
lxmf_message: {
hash: "failed-hash",
state: "failed",
content: "retry me",
destination_hash: "test-hash",
source_hash: "my-hash",
fields: {},
},
};
wrapper.vm.chatItems = [failedItem];
axiosMock.post.mockResolvedValue({ data: { lxmf_message: { hash: "new-hash", state: "outbound" } } });
await wrapper.vm.retrySendingMessage(failedItem);
expect(axiosMock.post).toHaveBeenCalledWith(
"/api/v1/lxmf-messages/send",
expect.objectContaining({
lxmf_message: expect.objectContaining({
content: "retry me",
destination_hash: "test-hash",
}),
})
);
// Old item should be removed
expect(wrapper.vm.chatItems.find((i) => i.lxmf_message.hash === "failed-hash")).toBeUndefined();
// New item should be added
expect(wrapper.vm.chatItems.find((i) => i.lxmf_message.hash === "new-hash")).toBeDefined();
});
});

View File

@@ -0,0 +1,192 @@
import { mount, flushPromises } from "@vue/test-utils";
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
import ReticulumConfigEditorPage from "@/components/tools/ReticulumConfigEditorPage.vue";
import DialogUtils from "@/js/DialogUtils";
import GlobalState from "@/js/GlobalState";
vi.mock("@/js/DialogUtils", () => ({
default: {
confirm: vi.fn(),
alert: vi.fn(),
prompt: vi.fn(),
},
}));
const SAMPLE_CONFIG =
"[reticulum]\n enable_transport = False\n\n[interfaces]\n [[Default Interface]]\n type = AutoInterface\n";
const DEFAULT_CONFIG =
"[reticulum]\n enable_transport = False\n\n[interfaces]\n [[Default Interface]]\n type = AutoInterface\n enabled = true\n";
const CONFIG_PATH = "/tmp/.reticulum/config";
describe("ReticulumConfigEditorPage.vue", () => {
let axiosMock;
beforeEach(() => {
axiosMock = {
get: vi.fn(),
put: vi.fn(),
post: vi.fn(),
};
window.api = axiosMock;
axiosMock.get.mockResolvedValue({
data: { content: SAMPLE_CONFIG, path: CONFIG_PATH },
});
axiosMock.put.mockResolvedValue({
data: { message: "Reticulum config saved", path: CONFIG_PATH },
});
axiosMock.post.mockImplementation((url) => {
if (url === "/api/v1/reticulum/config/reset") {
return Promise.resolve({
data: {
message: "Reticulum config restored to defaults",
content: DEFAULT_CONFIG,
path: CONFIG_PATH,
},
});
}
if (url === "/api/v1/reticulum/reload") {
return Promise.resolve({
data: { message: "Reticulum reloaded successfully" },
});
}
return Promise.resolve({ data: {} });
});
GlobalState.hasPendingInterfaceChanges = false;
if (!GlobalState.modifiedInterfaceNames) {
GlobalState.modifiedInterfaceNames = new Set();
}
GlobalState.modifiedInterfaceNames.clear();
});
afterEach(() => {
delete window.api;
vi.clearAllMocks();
});
const mountPage = () => {
return mount(ReticulumConfigEditorPage, {
global: {
mocks: {
$t: (key) => key,
},
stubs: {
MaterialDesignIcon: {
template: '<div class="mdi-stub" :data-icon-name="iconName"></div>',
props: ["iconName"],
},
},
},
});
};
it("loads the current config on mount and shows the path", async () => {
const wrapper = mountPage();
await flushPromises();
expect(axiosMock.get).toHaveBeenCalledWith("/api/v1/reticulum/config/raw");
expect(wrapper.vm.content).toBe(SAMPLE_CONFIG);
expect(wrapper.vm.originalContent).toBe(SAMPLE_CONFIG);
expect(wrapper.text()).toContain(CONFIG_PATH);
expect(wrapper.text()).toContain("tools.reticulum_config_editor.title");
});
it("marks the editor as dirty when the textarea changes", async () => {
const wrapper = mountPage();
await flushPromises();
expect(wrapper.vm.isDirty).toBe(false);
const textarea = wrapper.find("textarea");
await textarea.setValue(SAMPLE_CONFIG + "\n# my edit\n");
expect(wrapper.vm.isDirty).toBe(true);
expect(wrapper.text()).toContain("tools.reticulum_config_editor.unsaved");
});
it("saves config and shows the restart banner after success", async () => {
const wrapper = mountPage();
await flushPromises();
const newContent = SAMPLE_CONFIG + "\n# edit\n";
await wrapper.find("textarea").setValue(newContent);
await wrapper.vm.saveConfig();
await flushPromises();
expect(axiosMock.put).toHaveBeenCalledWith("/api/v1/reticulum/config/raw", { content: newContent });
expect(wrapper.vm.hasSavedChanges).toBe(true);
expect(wrapper.vm.showRestartReminder).toBe(true);
expect(GlobalState.hasPendingInterfaceChanges).toBe(true);
expect(wrapper.text()).toContain("tools.reticulum_config_editor.restart_required");
});
it("does not save when the editor is not dirty", async () => {
const wrapper = mountPage();
await flushPromises();
await wrapper.vm.saveConfig();
expect(axiosMock.put).not.toHaveBeenCalled();
});
it("restores defaults after confirmation", async () => {
DialogUtils.confirm.mockResolvedValue(true);
const wrapper = mountPage();
await flushPromises();
await wrapper.vm.restoreDefaults();
await flushPromises();
expect(DialogUtils.confirm).toHaveBeenCalled();
expect(axiosMock.post).toHaveBeenCalledWith("/api/v1/reticulum/config/reset");
expect(wrapper.vm.content).toBe(DEFAULT_CONFIG);
expect(wrapper.vm.originalContent).toBe(DEFAULT_CONFIG);
expect(wrapper.vm.hasSavedChanges).toBe(true);
expect(GlobalState.hasPendingInterfaceChanges).toBe(true);
});
it("does not restore defaults if the user cancels", async () => {
DialogUtils.confirm.mockResolvedValue(false);
const wrapper = mountPage();
await flushPromises();
await wrapper.vm.restoreDefaults();
await flushPromises();
expect(axiosMock.post).not.toHaveBeenCalledWith("/api/v1/reticulum/config/reset");
expect(wrapper.vm.content).toBe(SAMPLE_CONFIG);
});
it("reloads RNS and clears the restart banner", async () => {
const wrapper = mountPage();
await flushPromises();
wrapper.vm.hasSavedChanges = true;
await wrapper.vm.$nextTick();
await wrapper.vm.reloadRns();
await flushPromises();
expect(axiosMock.post).toHaveBeenCalledWith("/api/v1/reticulum/reload");
expect(wrapper.vm.hasSavedChanges).toBe(false);
expect(GlobalState.hasPendingInterfaceChanges).toBe(false);
});
it("discards unsaved changes back to the original content", async () => {
const wrapper = mountPage();
await flushPromises();
await wrapper.find("textarea").setValue("[reticulum]\n[interfaces]\n# changed");
expect(wrapper.vm.isDirty).toBe(true);
wrapper.vm.discardChanges();
expect(wrapper.vm.content).toBe(SAMPLE_CONFIG);
expect(wrapper.vm.isDirty).toBe(false);
});
it("shows an error toast when load fails", async () => {
axiosMock.get.mockRejectedValueOnce({
response: { data: { error: "boom" } },
});
const wrapper = mountPage();
await flushPromises();
expect(wrapper.vm.content).toBe("");
});
});

View File

@@ -207,7 +207,7 @@ describe("SettingsPage — config persistence (PATCH and related)", () => {
expect(api.patch).toHaveBeenCalledWith(
"/api/v1/config",
expect.objectContaining({
allow_auto_resending_failed_messages_with_attachments: false,
allow_auto_resending_failed_messages_with_attachments: true,
})
);
await w.vm.onAutoSendFailedMessagesToPropagationNodeChange();
@@ -264,10 +264,21 @@ describe("SettingsPage — config persistence (PATCH and related)", () => {
});
});
it("onLxmfIncomingDeliveryPresetChange PATCHes preset size", async () => {
const w = await mountSettingsPage(api);
w.vm.lxmfIncomingDeliveryPreset = "1gb";
await w.vm.onLxmfIncomingDeliveryPresetChange();
expect(api.patch).toHaveBeenCalledWith("/api/v1/config", {
lxmf_delivery_transfer_limit_in_bytes: 1_000_000_000,
});
});
it("LXMF transfer/sync limits PATCH after debounce", async () => {
const w = await mountSettingsPage(api);
w.vm.lxmfDeliveryTransferLimitInputMb = 9;
await w.vm.onLxmfDeliveryTransferLimitChange();
w.vm.lxmfIncomingDeliveryPreset = "custom";
w.vm.lxmfIncomingDeliveryCustomAmount = 9;
w.vm.lxmfIncomingDeliveryCustomUnit = "mb";
await w.vm.onLxmfIncomingDeliveryCustomChange();
await vi.advanceTimersByTimeAsync(1000);
expect(api.patch).toHaveBeenCalledWith("/api/v1/config", {
lxmf_delivery_transfer_limit_in_bytes: 9_000_000,

View File

@@ -0,0 +1,197 @@
import { mount, flushPromises } from "@vue/test-utils";
import { describe, it, expect, vi, beforeAll, afterEach } from "vitest";
import StickerView from "@/components/stickers/StickerView.vue";
const origIntersectionObserver = globalThis.IntersectionObserver;
beforeAll(() => {
const ctx = {
fillStyle: "",
strokeStyle: "",
fillRect: vi.fn(),
clearRect: vi.fn(),
save: vi.fn(),
restore: vi.fn(),
translate: vi.fn(),
scale: vi.fn(),
rotate: vi.fn(),
beginPath: vi.fn(),
closePath: vi.fn(),
moveTo: vi.fn(),
lineTo: vi.fn(),
arc: vi.fn(),
fill: vi.fn(),
stroke: vi.fn(),
setTransform: vi.fn(),
drawImage: vi.fn(),
measureText: vi.fn(() => ({ width: 0 })),
createLinearGradient: vi.fn(() => ({ addColorStop: vi.fn() })),
createRadialGradient: vi.fn(() => ({ addColorStop: vi.fn() })),
};
HTMLCanvasElement.prototype.getContext = vi.fn(() => ctx);
});
describe("StickerView.vue", () => {
it("renders img for static sticker", () => {
const w = mount(StickerView, {
props: {
src: "https://example.invalid/sticker.png",
imageType: "png",
alt: "x",
},
});
expect(w.find("img").exists()).toBe(true);
expect(w.find("video").exists()).toBe(false);
w.unmount();
});
it("renders video for webm", () => {
const w = mount(StickerView, {
props: {
src: "https://example.invalid/s.webm",
imageType: "webm",
},
});
expect(w.find("video").exists()).toBe(true);
w.unmount();
});
it("TGS does not fetch until intersection reports in view", async () => {
let ioCallback;
class MockIntersectionObserver {
constructor(cb) {
ioCallback = cb;
}
observe = vi.fn();
disconnect = vi.fn();
}
globalThis.IntersectionObserver = MockIntersectionObserver;
const baseFetch = globalThis.fetch;
const fetchSpy = vi.spyOn(globalThis, "fetch").mockImplementation(async (input, init) => {
const reqUrl = typeof input === "string" ? input : (input?.url ?? "");
if (reqUrl.includes("example.invalid/a.tgs")) {
return {
arrayBuffer: () => Promise.resolve(new ArrayBuffer(8)),
};
}
return baseFetch(input, init);
});
const tgsCallCount = () =>
fetchSpy.mock.calls.filter((c) => {
const u = typeof c[0] === "string" ? c[0] : (c[0]?.url ?? "");
return String(u).includes("example.invalid/a.tgs");
}).length;
const w = mount(StickerView, {
props: {
src: "https://example.invalid/a.tgs",
imageType: "tgs",
},
attachTo: document.body,
});
await flushPromises();
expect(tgsCallCount()).toBe(0);
const root = w.vm.$refs.stickerRoot;
expect(root).toBeTruthy();
ioCallback([{ isIntersecting: true, target: root }]);
await w.vm.$nextTick();
await w.vm.$nextTick();
expect(w.vm.inView).toBe(true);
expect(w.vm.$refs.lottieMount).toBeTruthy();
await vi.waitFor(() => {
expect(tgsCallCount()).toBeGreaterThan(0);
});
fetchSpy.mockRestore();
w.unmount();
});
it("WebM calls play when in view and pause when out", async () => {
let ioCallback;
class MockIntersectionObserver {
constructor(cb) {
ioCallback = cb;
}
observe = vi.fn();
disconnect = vi.fn();
}
globalThis.IntersectionObserver = MockIntersectionObserver;
const playSpy = vi.spyOn(HTMLVideoElement.prototype, "play").mockResolvedValue(undefined);
const pauseSpy = vi.spyOn(HTMLVideoElement.prototype, "pause").mockImplementation(() => {});
const w = mount(StickerView, {
props: {
src: "https://example.invalid/s.webm",
imageType: "webm",
},
attachTo: document.body,
});
await flushPromises();
const root = w.vm.$refs.stickerRoot;
ioCallback([{ isIntersecting: true, target: root }]);
await flushPromises();
expect(playSpy).toHaveBeenCalled();
ioCallback([{ isIntersecting: false, target: root }]);
await flushPromises();
expect(pauseSpy).toHaveBeenCalled();
playSpy.mockRestore();
pauseSpy.mockRestore();
w.unmount();
});
it("mountLottie fetches and decodes gzip TGS payload", async () => {
const { gzipSync } = await import("node:zlib");
const payload = JSON.stringify({
v: "5.5.7",
fr: 30,
ip: 0,
op: 30,
w: 512,
h: 512,
nm: "t",
ddd: 0,
assets: [],
layers: [],
});
const gz = gzipSync(Buffer.from(payload, "utf8"));
const ab = gz.buffer.slice(gz.byteOffset, gz.byteOffset + gz.byteLength);
const fetchMock = vi.fn(() =>
Promise.resolve({
arrayBuffer: () => Promise.resolve(ab),
})
);
vi.stubGlobal("fetch", fetchMock);
const w = mount(StickerView, {
props: {
src: "https://example.invalid/a.tgs",
imageType: "tgs",
},
attachTo: document.body,
});
await w.vm.$nextTick();
expect(w.vm.$refs.lottieMount).toBeTruthy();
w.vm.inView = true;
await w.vm.mountLottie();
await flushPromises();
expect(fetchMock).toHaveBeenCalled();
expect(w.vm.lottieAnim).toBeTruthy();
w.unmount();
vi.unstubAllGlobals();
});
afterEach(() => {
globalThis.IntersectionObserver = origIntersectionObserver;
vi.restoreAllMocks();
});
});

View File

@@ -19,6 +19,7 @@ describe("ToolsPage.vue", () => {
{ path: "/forwarder", name: "forwarder", component: { template: "div" } },
{ path: "/documentation", name: "documentation", component: { template: "div" } },
{ path: "/micron-editor", name: "micron-editor", component: { template: "div" } },
{ path: "/tools/reticulum-config-editor", name: "reticulum-config-editor", component: { template: "div" } },
{ path: "/paper-message", name: "paper-message", component: { template: "div" } },
{ path: "/rnode-flasher", name: "rnode-flasher", component: { template: "div" } },
{ path: "/debug-logs", name: "debug-logs", component: { template: "div" } },
@@ -45,14 +46,14 @@ describe("ToolsPage.vue", () => {
it("renders the tools page header", () => {
const wrapper = mountToolsPage();
expect(wrapper.text()).toContain("tools.utilities");
expect(wrapper.text()).toContain("tools.power_tools");
expect(wrapper.text()).not.toContain("tools.utilities");
});
it("renders all tool rows", () => {
const wrapper = mountToolsPage();
const toolRows = wrapper.findAll(".tool-row");
expect(toolRows.length).toBe(18);
expect(toolRows.length).toBe(19);
});
it("filters tools based on search query", async () => {
@@ -77,6 +78,6 @@ describe("ToolsPage.vue", () => {
await clearButton.trigger("click");
expect(wrapper.vm.searchQuery).toBe("");
expect(wrapper.vm.filteredTools.length).toBe(18);
expect(wrapper.vm.filteredTools.length).toBe(19);
});
});

View File

@@ -342,7 +342,7 @@ describe("SettingsPage Component", () => {
is_transport_enabled: true,
lxmf_local_propagation_node_enabled: false,
auto_resend_failed_messages_when_announce_received: true,
allow_auto_resending_failed_messages_with_attachments: false,
allow_auto_resending_failed_messages_with_attachments: true,
auto_send_failed_messages_to_propagation_node: false,
show_suggested_community_interfaces: true,
lxmf_local_propagation_node_enabled: false,

View File

@@ -126,7 +126,9 @@ describe("NetworkVisualiser Optimization and Abort", () => {
return acc;
}, {});
wrapper.vm.createIconImage = vi.fn().mockImplementation(() => new Promise((r) => setTimeout(() => r("blob:icon"), 50)));
wrapper.vm.createIconImage = vi
.fn()
.mockImplementation(() => new Promise((r) => setTimeout(() => r("blob:icon"), 50)));
await wrapper.vm.processVisualization();

View File

@@ -20,7 +20,7 @@ export function buildFullServerConfig(overrides = {}) {
theme: "dark",
language: "en",
auto_resend_failed_messages_when_announce_received: true,
allow_auto_resending_failed_messages_with_attachments: false,
allow_auto_resending_failed_messages_with_attachments: true,
auto_send_failed_messages_to_propagation_node: false,
show_suggested_community_interfaces: true,
lxmf_delivery_transfer_limit_in_bytes: 10_000_000,

View File

@@ -0,0 +1,74 @@
import { describe, it, expect, vi, afterEach } from "vitest";
import { attachInView, isAnimatedRasterType } from "@/js/inViewObserver.js";
describe("inViewObserver", () => {
describe("isAnimatedRasterType", () => {
it("returns true for gif and webp", () => {
expect(isAnimatedRasterType("gif")).toBe(true);
expect(isAnimatedRasterType("WEBP")).toBe(true);
});
it("returns false for png, jpeg, empty, null", () => {
expect(isAnimatedRasterType("png")).toBe(false);
expect(isAnimatedRasterType("jpeg")).toBe(false);
expect(isAnimatedRasterType("")).toBe(false);
expect(isAnimatedRasterType(null)).toBe(false);
});
});
describe("attachInView", () => {
const origIo = globalThis.IntersectionObserver;
afterEach(() => {
globalThis.IntersectionObserver = origIo;
});
it("invokes callback with intersecting true when IntersectionObserver is undefined", () => {
globalThis.IntersectionObserver = undefined;
const fn = vi.fn();
const el = document.createElement("div");
const disconnect = attachInView(el, fn);
expect(fn).toHaveBeenCalledTimes(1);
expect(fn.mock.calls[0][0].isIntersecting).toBe(true);
expect(fn.mock.calls[0][0].target).toBe(el);
disconnect();
});
it("returns noop disconnect when el is null", () => {
const fn = vi.fn();
const disconnect = attachInView(null, fn);
expect(fn).not.toHaveBeenCalled();
disconnect();
});
it("observes element and forwards entries to callback", () => {
const observe = vi.fn();
const disconnectIo = vi.fn();
let callback;
class MockIntersectionObserver {
constructor(cb, opts) {
callback = cb;
expect(opts.threshold).toBe(0.5);
expect(opts.rootMargin).toBe("10px");
}
observe(el) {
observe(el);
}
disconnect() {
disconnectIo();
}
}
globalThis.IntersectionObserver = MockIntersectionObserver;
const el = document.createElement("div");
const fn = vi.fn();
const disconnect = attachInView(el, fn, { threshold: 0.5, rootMargin: "10px" });
expect(observe).toHaveBeenCalledWith(el);
callback([{ isIntersecting: false, target: el }]);
expect(fn).toHaveBeenCalledWith(expect.objectContaining({ isIntersecting: false, target: el }));
disconnect();
expect(disconnectIo).toHaveBeenCalled();
});
});
});

View File

@@ -0,0 +1,37 @@
import { describe, it, expect } from "vitest";
import {
INCOMING_DELIVERY_MAX_BYTES,
INCOMING_DELIVERY_PRESET_BYTES,
clampIncomingDeliveryBytes,
incomingDeliveryBytesFromCustom,
incomingDeliveryBytesFromPresetKey,
syncIncomingDeliveryFieldsFromBytes,
} from "../../meshchatx/src/frontend/js/settings/incomingDeliveryLimit.js";
describe("incomingDeliveryLimit", () => {
it("clamps to max 1 GiB", () => {
expect(clampIncomingDeliveryBytes(2_000_000_000)).toBe(INCOMING_DELIVERY_MAX_BYTES);
});
it("maps preset keys to bytes", () => {
expect(incomingDeliveryBytesFromPresetKey("1gb")).toBe(1_000_000_000);
expect(incomingDeliveryBytesFromPresetKey("custom")).toBeNull();
});
it("parses custom MB and GB", () => {
expect(incomingDeliveryBytesFromCustom(9, "mb")).toBe(9_000_000);
expect(incomingDeliveryBytesFromCustom(1, "gb")).toBe(1_000_000_000);
});
it("syncs preset field for exact preset bytes", () => {
const s = syncIncomingDeliveryFieldsFromBytes(INCOMING_DELIVERY_PRESET_BYTES["25mb"]);
expect(s.preset).toBe("25mb");
});
it("syncs custom for non-preset bytes", () => {
const s = syncIncomingDeliveryFieldsFromBytes(9_000_000);
expect(s.preset).toBe("custom");
expect(s.customAmount).toBe(9);
expect(s.customUnit).toBe("mb");
});
});

View File

@@ -0,0 +1,141 @@
import { gzipSync } from "node:zlib";
import { describe, it, expect, vi, beforeAll } from "vitest";
import { decodeTgsBuffer } from "@/js/tgsDecode.js";
beforeAll(() => {
const ctx = {
fillStyle: "",
strokeStyle: "",
fillRect: vi.fn(),
clearRect: vi.fn(),
save: vi.fn(),
restore: vi.fn(),
translate: vi.fn(),
scale: vi.fn(),
rotate: vi.fn(),
beginPath: vi.fn(),
closePath: vi.fn(),
moveTo: vi.fn(),
lineTo: vi.fn(),
arc: vi.fn(),
fill: vi.fn(),
stroke: vi.fn(),
setTransform: vi.fn(),
drawImage: vi.fn(),
measureText: vi.fn(() => ({ width: 0 })),
createLinearGradient: vi.fn(() => ({ addColorStop: vi.fn() })),
createRadialGradient: vi.fn(() => ({ addColorStop: vi.fn() })),
};
HTMLCanvasElement.prototype.getContext = vi.fn(() => ctx);
});
function randomUint8Array(n) {
const u = new Uint8Array(n);
crypto.getRandomValues(u);
return u;
}
function randomJsonValue(depth) {
if (depth <= 0) {
return null;
}
const r = Math.random();
if (r < 0.2) {
return null;
}
if (r < 0.4) {
return Math.floor(Math.random() * 1_000_000);
}
if (r < 0.55) {
return String.fromCharCode(32 + Math.floor(Math.random() * 80));
}
if (r < 0.75) {
const n = Math.floor(Math.random() * 8);
return Array.from({ length: n }, () => randomJsonValue(depth - 1));
}
const n = Math.floor(Math.random() * 6);
const o = {};
for (let i = 0; i < n; i++) {
o[`k${i}`] = randomJsonValue(depth - 1);
}
return o;
}
function randomLottieLikeJson() {
const base = {
v: "5.5.7",
fr: 30,
ip: 0,
op: 60,
w: 512,
h: 512,
nm: "x",
ddd: 0,
assets: [],
layers: [],
};
if (Math.random() < 0.25) {
base.layers = [{ ty: 4, nm: "s", ind: 1, ks: {}, ip: 0, op: 60, st: 0, bm: 0 }];
}
if (Math.random() < 0.2) {
delete base.w;
}
if (Math.random() < 0.2) {
delete base.h;
}
if (Math.random() < 0.2) {
base.extra = randomJsonValue(5);
}
return base;
}
describe("fuzzing: TGS decode and lottie_light", () => {
it("fuzzing: decodeTgsBuffer handles random buffers without unhandled rejection", async () => {
for (let i = 0; i < 2000; i++) {
const len = Math.floor(Math.random() * 6144);
const buf = randomUint8Array(len).buffer;
try {
await decodeTgsBuffer(buf);
} catch {
/* JSON.parse, gzip, or missing DecompressionStream */
}
}
expect(true).toBe(true);
});
it("fuzzing: decodeTgsBuffer handles gzip-compressed random JSON", async () => {
for (let i = 0; i < 400; i++) {
const payload = JSON.stringify(randomJsonValue(6));
const gz = gzipSync(Buffer.from(payload, "utf8"));
const ab = gz.buffer.slice(gz.byteOffset, gz.byteOffset + gz.byteLength);
try {
await decodeTgsBuffer(ab);
} catch {
/* invalid JSON after decompress */
}
}
expect(true).toBe(true);
});
it("fuzzing: lottie_light loadAnimation random animationData does not crash the runner", async () => {
const lottieMod = await import("lottie-web/build/player/lottie_light.js");
const lib = lottieMod.default || lottieMod;
const container = document.createElement("div");
for (let i = 0; i < 500; i++) {
const animationData = Math.random() < 0.6 ? randomLottieLikeJson() : randomJsonValue(5);
try {
const anim = lib.loadAnimation({
container,
renderer: "svg",
loop: true,
autoplay: false,
animationData,
});
anim.destroy();
} catch {
/* malformed animation graph */
}
}
expect(true).toBe(true);
});
});

View File

@@ -3,6 +3,19 @@ import { injectMeshchatThemeVariables } from "../../meshchatx/src/frontend/theme
injectMeshchatThemeVariables(typeof document !== "undefined" ? document : undefined);
if (typeof Blob !== "undefined" && typeof Blob.prototype.stream !== "function") {
Blob.prototype.stream = function streamPolyfill() {
const blob = this;
return new ReadableStream({
async start(controller) {
const u8 = new Uint8Array(await blob.arrayBuffer());
controller.enqueue(u8);
controller.close();
},
});
};
}
import { readFileSync, existsSync } from "fs";
import { join } from "path";
import { vi } from "vitest";

View File

@@ -0,0 +1,27 @@
import { describe, it, expect } from "vitest";
import { gzipSync } from "node:zlib";
import { decodeTgsBuffer } from "@/js/tgsDecode.js";
describe("tgsDecode", () => {
it("decodes gzip-wrapped Lottie JSON", async () => {
const json = JSON.stringify({ v: "5", fr: 30, ip: 0, op: 10, w: 100, h: 100, layers: [] });
const gz = gzipSync(Buffer.from(json, "utf8"));
const buf = gz.buffer.slice(gz.byteOffset, gz.byteOffset + gz.byteLength);
const data = await decodeTgsBuffer(buf);
expect(data.w).toBe(100);
expect(data.h).toBe(100);
});
it("decodes raw JSON without gzip header", async () => {
const json = '{"a":1}';
const enc = new TextEncoder();
const data = await decodeTgsBuffer(enc.encode(json).buffer);
expect(data.a).toBe(1);
});
it("rejects invalid JSON after gzip", async () => {
const gz = gzipSync(Buffer.from("not-json{{{", "utf8"));
const buf = gz.buffer.slice(gz.byteOffset, gz.byteOffset + gz.byteLength);
await expect(decodeTgsBuffer(buf)).rejects.toThrow();
});
});