mirror of
https://git.quad4.io/RNS-Things/MeshChatX.git
synced 2026-04-27 04:15:38 +00:00
refactor(tests): improve code readability by formatting and simplifying assertions in test files
This commit is contained in:
@@ -7,9 +7,7 @@ from __future__ import annotations
|
||||
import gzip
|
||||
import json
|
||||
|
||||
TINY_PNG = (
|
||||
bytes([0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A]) + b"\x00" * 32
|
||||
)
|
||||
TINY_PNG = bytes([0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A]) + b"\x00" * 32
|
||||
|
||||
TINY_GIF = b"GIF89a" + b"\x00" * 32
|
||||
|
||||
|
||||
@@ -38,7 +38,9 @@ def _build_wav_pcm16(
|
||||
wf.setframerate(samplerate)
|
||||
frames = bytearray()
|
||||
for i in range(n_samples):
|
||||
sample = int(0.3 * 32767 * math.sin(2 * math.pi * frequency * (i / samplerate)))
|
||||
sample = int(
|
||||
0.3 * 32767 * math.sin(2 * math.pi * frequency * (i / samplerate))
|
||||
)
|
||||
for _ in range(channels):
|
||||
frames.extend(struct.pack("<h", sample))
|
||||
wf.writeframes(bytes(frames))
|
||||
@@ -84,7 +86,8 @@ def test_encode_pcm_to_ogg_opus_writes_valid_container():
|
||||
assert f.read(4) == b"OggS"
|
||||
assert os.path.getsize(out) > 0
|
||||
finally:
|
||||
os.unlink(out)
|
||||
if os.path.exists(out):
|
||||
os.unlink(out)
|
||||
|
||||
|
||||
def test_encode_pcm_to_ogg_opus_resamples_arbitrary_rate():
|
||||
@@ -104,7 +107,8 @@ def test_encode_pcm_to_ogg_opus_resamples_arbitrary_rate():
|
||||
with open(out, "rb") as f:
|
||||
assert f.read(4) == b"OggS"
|
||||
finally:
|
||||
os.unlink(out)
|
||||
if os.path.exists(out):
|
||||
os.unlink(out)
|
||||
|
||||
|
||||
def test_encode_audio_to_ogg_opus_decodes_and_reencodes(tmp_path):
|
||||
@@ -190,14 +194,17 @@ def test_encode_pcm_to_ogg_opus_preserves_duration(duration_seconds):
|
||||
sr = 48000
|
||||
n = int(sr * duration_seconds)
|
||||
t = np.arange(n, dtype=np.float32) / sr
|
||||
samples = (0.3 * np.sin(2 * math.pi * 440.0 * t)).astype(np.float32).reshape(-1, 1)
|
||||
samples = (
|
||||
(0.3 * np.sin(2 * math.pi * 440.0 * t)).astype(np.float32).reshape(-1, 1)
|
||||
)
|
||||
audio_codec.encode_pcm_to_ogg_opus(samples, sr, 1, out)
|
||||
encoded = _ogg_opus_duration_seconds(out)
|
||||
assert abs(encoded - duration_seconds) < 0.001, (
|
||||
f"expected {duration_seconds}s, got {encoded}s"
|
||||
)
|
||||
finally:
|
||||
os.unlink(out)
|
||||
if os.path.exists(out):
|
||||
os.unlink(out)
|
||||
|
||||
|
||||
def test_encode_pcm_to_ogg_opus_audio_profile_keeps_stereo():
|
||||
@@ -223,4 +230,5 @@ def test_encode_pcm_to_ogg_opus_audio_profile_keeps_stereo():
|
||||
assert channels == 2
|
||||
assert _ogg_opus_duration_seconds(out) == pytest.approx(1.0, abs=0.001)
|
||||
finally:
|
||||
os.unlink(out)
|
||||
if os.path.exists(out):
|
||||
os.unlink(out)
|
||||
|
||||
@@ -61,10 +61,9 @@ async def find_route_handler(app_instance, path, method):
|
||||
|
||||
|
||||
def test_normalize_handles_non_list_input():
|
||||
assert (
|
||||
ReticulumMeshChat.normalize_discovered_ifac_fields({"foo": "bar"})
|
||||
== {"foo": "bar"}
|
||||
)
|
||||
assert ReticulumMeshChat.normalize_discovered_ifac_fields({"foo": "bar"}) == {
|
||||
"foo": "bar"
|
||||
}
|
||||
assert ReticulumMeshChat.normalize_discovered_ifac_fields(None) is None
|
||||
|
||||
|
||||
|
||||
@@ -29,8 +29,10 @@ _JSON_LEAF = (
|
||||
def _recursive_json(max_leaves: int = 24):
|
||||
return st.recursive(
|
||||
_JSON_LEAF,
|
||||
lambda children: st.lists(children, max_size=8)
|
||||
| st.dictionaries(st.text(max_size=12), children, max_size=8),
|
||||
lambda children: (
|
||||
st.lists(children, max_size=8)
|
||||
| st.dictionaries(st.text(max_size=12), children, max_size=8)
|
||||
),
|
||||
max_leaves=max_leaves,
|
||||
)
|
||||
|
||||
@@ -60,7 +62,9 @@ def test_parse_tgs_gzip_json_fuzz(payload):
|
||||
merged.setdefault("op", 60.0)
|
||||
merged.setdefault("w", 100)
|
||||
merged.setdefault("h", 100)
|
||||
raw = gzip.compress(json.dumps(merged, default=str).encode("utf-8", errors="surrogateescape"))
|
||||
raw = gzip.compress(
|
||||
json.dumps(merged, default=str).encode("utf-8", errors="surrogateescape")
|
||||
)
|
||||
if len(raw) > sticker_utils.MAX_ANIMATED_BYTES:
|
||||
raw = raw[: sticker_utils.MAX_ANIMATED_BYTES]
|
||||
try:
|
||||
@@ -87,7 +91,18 @@ def test_parse_webm_fuzz_never_raises_unexpected(tail):
|
||||
st.none(),
|
||||
st.text(max_size=48),
|
||||
st.sampled_from(
|
||||
["png", "jpeg", "webp", "gif", "bmp", "tgs", "webm", "svg", "image/png", ""],
|
||||
[
|
||||
"png",
|
||||
"jpeg",
|
||||
"webp",
|
||||
"gif",
|
||||
"bmp",
|
||||
"tgs",
|
||||
"webm",
|
||||
"svg",
|
||||
"image/png",
|
||||
"",
|
||||
],
|
||||
),
|
||||
),
|
||||
raw=st.binary(min_size=0, max_size=8192),
|
||||
@@ -103,7 +118,9 @@ def test_extract_metadata_fuzz_never_raises(image_type, raw):
|
||||
typ=st.one_of(
|
||||
st.none(),
|
||||
st.text(max_size=48),
|
||||
st.sampled_from(["png", "jpeg", "jpg", "webp", "gif", "bmp", "tgs", "webm", "svg", ""]),
|
||||
st.sampled_from(
|
||||
["png", "jpeg", "jpg", "webp", "gif", "bmp", "tgs", "webm", "svg", ""]
|
||||
),
|
||||
),
|
||||
strict=st.booleans(),
|
||||
)
|
||||
@@ -148,7 +165,9 @@ def test_mime_for_image_type_fuzz_never_raises(t):
|
||||
description=st.one_of(st.none(), st.text(max_size=400)),
|
||||
pack_type=st.one_of(st.none(), st.text(max_size=40)),
|
||||
)
|
||||
def test_sticker_pack_sanitizers_fuzz_never_raises(title, short_name, description, pack_type):
|
||||
def test_sticker_pack_sanitizers_fuzz_never_raises(
|
||||
title, short_name, description, pack_type
|
||||
):
|
||||
sticker_pack_utils.sanitize_pack_title(title)
|
||||
sticker_pack_utils.sanitize_pack_short_name(short_name)
|
||||
sticker_pack_utils.sanitize_pack_description(description)
|
||||
@@ -234,7 +253,10 @@ def test_strict_tgs_from_structured_gzip_json_fuzz(inner):
|
||||
|
||||
@settings(max_examples=150, deadline=None)
|
||||
@given(
|
||||
b64=st.one_of(st.text(max_size=400), st.binary(max_size=200).map(lambda b: base64.b64encode(b).decode("ascii"))),
|
||||
b64=st.one_of(
|
||||
st.text(max_size=400),
|
||||
st.binary(max_size=200).map(lambda b: base64.b64encode(b).decode("ascii")),
|
||||
),
|
||||
)
|
||||
def test_sticker_validate_export_document_sticker_rows_fuzz(b64):
|
||||
doc = {
|
||||
|
||||
@@ -452,7 +452,9 @@ def _build_wav_pcm16(samplerate=48000, duration_seconds=0.5, frequency=440.0):
|
||||
wf.setframerate(samplerate)
|
||||
frames = bytearray()
|
||||
for i in range(n_samples):
|
||||
sample = int(0.3 * 32767 * math.sin(2 * math.pi * frequency * (i / samplerate)))
|
||||
sample = int(
|
||||
0.3 * 32767 * math.sin(2 * math.pi * frequency * (i / samplerate))
|
||||
)
|
||||
frames.extend(struct.pack("<h", sample))
|
||||
wf.writeframes(bytes(frames))
|
||||
return buf.getvalue()
|
||||
|
||||
@@ -140,9 +140,7 @@ class TestRequireUserFacingFlag:
|
||||
)
|
||||
# Without ``require_user_facing`` the helper preserves its old behavior
|
||||
# so the conversation list (which renders reactions) is unaffected.
|
||||
assert (
|
||||
compute_lxmf_conversation_unread_from_latest_row(row) is True
|
||||
)
|
||||
assert compute_lxmf_conversation_unread_from_latest_row(row) is True
|
||||
|
||||
def test_reaction_filtered_when_require_user_facing(self):
|
||||
row = _row(
|
||||
|
||||
@@ -98,7 +98,9 @@ async def test_download_firmware_returns_zip_for_allowed_url(web_app):
|
||||
async with TestClient(TestServer(aio_app)) as client:
|
||||
r = await client.get(
|
||||
"/api/v1/tools/rnode/download_firmware",
|
||||
params={"url": "https://github.com/owner/repo/releases/download/v1/firmware.zip"},
|
||||
params={
|
||||
"url": "https://github.com/owner/repo/releases/download/v1/firmware.zip"
|
||||
},
|
||||
)
|
||||
assert r.status == 200
|
||||
assert r.headers.get("Content-Type", "").startswith("application/zip")
|
||||
@@ -122,7 +124,9 @@ async def test_download_firmware_propagates_upstream_error_status(web_app):
|
||||
async with TestClient(TestServer(aio_app)) as client:
|
||||
r = await client.get(
|
||||
"/api/v1/tools/rnode/download_firmware",
|
||||
params={"url": "https://git.quad4.io/Reticulum/RNode_Firmware/releases/download/v1/firmware.zip"},
|
||||
params={
|
||||
"url": "https://git.quad4.io/Reticulum/RNode_Firmware/releases/download/v1/firmware.zip"
|
||||
},
|
||||
)
|
||||
assert r.status == 404
|
||||
body = await r.json()
|
||||
@@ -140,7 +144,9 @@ async def test_download_firmware_returns_500_on_exception(web_app):
|
||||
async with TestClient(TestServer(aio_app)) as client:
|
||||
r = await client.get(
|
||||
"/api/v1/tools/rnode/download_firmware",
|
||||
params={"url": "https://github.com/owner/repo/releases/download/v1/firmware.zip"},
|
||||
params={
|
||||
"url": "https://github.com/owner/repo/releases/download/v1/firmware.zip"
|
||||
},
|
||||
)
|
||||
assert r.status == 500
|
||||
body = await r.json()
|
||||
@@ -190,7 +196,12 @@ async def test_latest_release_returns_proxied_payload(web_app):
|
||||
aio_app = _build_aio_app(web_app)
|
||||
payload = {
|
||||
"tag_name": "v1.83",
|
||||
"assets": [{"name": "rnode_firmware_heltec32v3.zip", "browser_download_url": "https://x/rnode.zip"}],
|
||||
"assets": [
|
||||
{
|
||||
"name": "rnode_firmware_heltec32v3.zip",
|
||||
"browser_download_url": "https://x/rnode.zip",
|
||||
}
|
||||
],
|
||||
}
|
||||
fake_session = _FakeJsonSession(200, payload)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user