Files
pyxis/tests/interop/conftest.py
torlando-tech 6e47cb808b Increase playback buffer for jitter-free LXST RX audio
PCM_RING_FRAMES 16→50 (320ms→1000ms capacity) and
PREBUFFER_FRAMES 3→15 (60ms→300ms prebuffer) to match
LXST-kt's buffering strategy. Interop test suite confirms
zero underruns with ±100ms jitter at these settings.

Also adds tests/interop/ with 48 Python tests verifying
wire format, codec round-trip, and pipeline compatibility
between Pyxis, Python LXST, and LXST-kt implementations.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 10:57:07 -05:00

364 lines
11 KiB
Python

"""
Shared fixtures for LXST interop tests.
These tests verify wire-format and codec compatibility between:
- Pyxis (ESP32 C++ implementation)
- Python LXST reference implementation
- LXST-kt (Kotlin/Android) — same wire format as Python LXST
"""
import sys
import math
import struct
import numpy as np
import pytest
# Add Python LXST to path
sys.path.insert(0, "$HOME/repos/public/LXST")
import pycodec2
from RNS.vendor import umsgpack as msgpack
# ── Constants matching all three implementations ──
# Msgpack field keys (Network.py, UIManager.cpp, PacketRouter.kt)
FIELD_SIGNALLING = 0x00
FIELD_FRAMES = 0x01
# Codec type header bytes (Codecs/__init__.py, Packetizer.kt, UIManager.h)
CODEC_NULL = 0xFF
CODEC_RAW = 0x00
CODEC_OPUS = 0x01
CODEC_CODEC2 = 0x02
# Codec2 mode headers (Codec2.py, Codec2.kt, codec_wrapper.cpp)
MODE_HEADERS = {
700: 0x00, 1200: 0x01, 1300: 0x02, 1400: 0x03,
1600: 0x04, 2400: 0x05, 3200: 0x06,
}
HEADER_MODES = {v: k for k, v in MODE_HEADERS.items()}
# Codec2 library mode mapping (matches codec_wrapper.cpp headerToLibraryMode)
MODE_TO_LIBRARY = {
3200: 0, 2400: 1, 1600: 2, 1400: 3, 1300: 4, 1200: 5, 700: 8,
}
# Signalling constants (UIManager.h, Signalling in LXST-kt)
STATUS_BUSY = 0x00
STATUS_REJECTED = 0x01
STATUS_CALLING = 0x02
STATUS_AVAILABLE = 0x03
STATUS_RINGING = 0x04
STATUS_CONNECTING = 0x05
STATUS_ESTABLISHED = 0x06
PREFERRED_PROFILE = 0xFF
# Profile IDs (Profile.kt)
PROFILE_ULBW = 0x10
PROFILE_VLBW = 0x20
PROFILE_LBW = 0x30
PROFILE_MQ = 0x40
@pytest.fixture
def codec2_3200():
"""Create a pycodec2 instance for 3200 bps mode."""
return pycodec2.Codec2(3200)
@pytest.fixture
def codec2_1600():
"""Create a pycodec2 instance for 1600 bps mode."""
return pycodec2.Codec2(1600)
@pytest.fixture
def lxst_codec2_3200():
"""Create a Python LXST Codec2 instance for 3200 bps."""
from LXST.Codecs.Codec2 import Codec2
return Codec2(mode=3200)
@pytest.fixture
def lxst_codec2_1600():
"""Create a Python LXST Codec2 instance for 1600 bps."""
from LXST.Codecs.Codec2 import Codec2
return Codec2(mode=1600)
@pytest.fixture
def sine_8khz_200ms():
"""Generate 200ms of 440Hz sine wave at 8kHz sample rate (int16)."""
sr = 8000
duration = 0.2
t = np.arange(int(sr * duration)) / sr
samples = (np.sin(2 * np.pi * 440 * t) * 16000).astype(np.int16)
return samples
@pytest.fixture
def sine_8khz_1s():
"""Generate 1 second of 440Hz sine wave at 8kHz sample rate (int16)."""
sr = 8000
duration = 1.0
t = np.arange(int(sr * duration)) / sr
samples = (np.sin(2 * np.pi * 440 * t) * 16000).astype(np.int16)
return samples
def codec2_mode_header(codec2):
"""
Get the wire-format mode header byte for a pycodec2 instance.
pycodec2 doesn't expose the mode directly, so we infer from bytes_per_frame
and samples_per_frame.
"""
spf = codec2.samples_per_frame()
bpf = codec2.bytes_per_frame()
# Map (spf, bpf) to mode header
KNOWN_PARAMS = {
(160, 8): 0x06, # 3200
(160, 8): 0x06, # 3200
(320, 8): 0x04, # 1600
(320, 7): 0x03, # 1400
(320, 7): 0x02, # 1300 (same bpf as 1400 but different params)
(320, 6): 0x01, # 1200
(320, 4): 0x00, # 700C
}
# This is ambiguous for some modes. Use a different approach:
# Try each mode and check SPF match
for mode, header in MODE_HEADERS.items():
lib_mode = MODE_TO_LIBRARY[mode]
test_codec = pycodec2.Codec2(mode)
if test_codec.samples_per_frame() == spf and test_codec.bytes_per_frame() == bpf:
# Could be multiple matches (1300 vs 1400 both have spf=320, bpf=7)
# Return first match — caller should ensure they pass the right codec
return header
raise ValueError(f"Unknown codec2 params: spf={spf} bpf={bpf}")
def encode_codec2_subframes(codec2, pcm_int16, mode_header=None):
"""
Encode PCM samples into individual Codec2 sub-frames.
Simulates Pyxis i2s_capture.cpp + codec_wrapper.cpp:
Each encode produces [mode_header(1)] + [raw_encoded(N)] bytes.
pycodec2.encode() returns only raw bytes, so we prepend the mode header
to match what Pyxis codec_wrapper.cpp does.
Returns list of (1 + bytes_per_frame)-byte encoded frames.
"""
spf = codec2.samples_per_frame()
bpf = codec2.bytes_per_frame()
n_frames = len(pcm_int16) // spf
if mode_header is None:
mode_header = codec2_mode_header(codec2)
frames = []
for i in range(n_frames):
chunk = pcm_int16[i * spf:(i + 1) * spf]
raw_encoded = codec2.encode(chunk)
# Prepend mode header (matches Pyxis codec_wrapper.cpp encode())
frame = bytes([mode_header]) + raw_encoded
frames.append(frame)
return frames
def batch_subframes_pyxis_style(subframes, mode_header):
"""
Batch multiple Codec2 sub-frames into Pyxis wire format.
Simulates UIManager.cpp call_send_audio_batch TX batching:
- First frame: keep [mode_header] + [data]
- Subsequent frames: strip mode header, append raw data only
- Result: [mode_header(1)] + [N * bytes_per_frame]
Returns the batch_data bytes (without codec_type prefix).
"""
if not subframes:
return b""
# First frame keeps header + data
batch = bytearray(subframes[0])
# Subsequent frames: strip 1-byte mode header
for frame in subframes[1:]:
batch.extend(frame[1:]) # skip mode header byte
return bytes(batch)
def build_pyxis_audio_packet(batch_data):
"""
Build a complete Pyxis-format wire packet for audio.
Simulates UIManager.cpp call_send_audio_batch:
{0x01: bin8([codec_type(0x02)] + batch_data)}
Where batch_data = [mode_header] + [N * raw_codec2_bytes]
"""
# Prepend codec type byte
frame_bytes = bytes([CODEC_CODEC2]) + batch_data
packet_data = {FIELD_FRAMES: frame_bytes}
return msgpack.packb(packet_data)
def build_columba_audio_packet(codec_type_byte, encoded_frame):
"""
Build a Columba/LXST-kt format wire packet for audio.
Simulates Packetizer.kt + Python LXST Packetizer:
{0x01: bytes([codec_type] + encoded_frame)}
Where encoded_frame = [mode_header] + [N * raw_codec2_bytes]
"""
frame_bytes = bytes([codec_type_byte]) + encoded_frame
packet_data = {FIELD_FRAMES: frame_bytes}
return msgpack.packb(packet_data)
def build_signal_packet(signal_value):
"""
Build a signalling packet matching all implementations.
Format: {0x00: [signal_value]}
"""
packet_data = {FIELD_SIGNALLING: [signal_value]}
return msgpack.packb(packet_data)
def parse_pyxis_rx(wire_bytes):
"""
Simulate Pyxis call_on_packet() parsing.
Parses the raw msgpack bytes exactly as UIManager.cpp does:
- Check for fixmap(1): 0x81
- Field 0x00 = signalling, field 0x01 = audio
- Audio: bin8/bin16 → single frame, fixarray → batched frames
- Returns list of (codec_type, codec_data) tuples for audio frames
- Returns list of signal values for signalling packets
"""
buf = wire_bytes
if len(buf) < 4:
return {"error": "packet too short"}
if buf[0] != 0x81:
return {"error": f"expected fixmap(1), got 0x{buf[0]:02x}"}
field = buf[1]
result = {"field": field, "frames": [], "signals": []}
if field == FIELD_SIGNALLING:
# {0x00: [signal]}
if buf[2] != 0x91:
return {"error": f"expected fixarray(1), got 0x{buf[2]:02x}"}
if buf[3] <= 0x7F:
result["signals"].append(buf[3])
elif buf[3] == 0xCC and len(buf) >= 5:
result["signals"].append(buf[4])
elif buf[3] == 0xCD and len(buf) >= 6:
result["signals"].append((buf[4] << 8) | buf[5])
else:
return {"error": f"unparseable signal 0x{buf[3]:02x}"}
elif field == FIELD_FRAMES:
fmt = buf[2]
if (fmt & 0xF0) == 0x90:
# fixarray: batched frames
array_len = fmt & 0x0F
pos = 3
for _ in range(array_len):
if pos >= len(buf):
break
if buf[pos] == 0xC4:
frame_len = buf[pos + 1]
frame_start = pos + 2
elif buf[pos] == 0xC5:
frame_len = (buf[pos + 1] << 8) | buf[pos + 2]
frame_start = pos + 3
else:
break
if frame_start + frame_len > len(buf) or frame_len < 2:
break
frame = buf[frame_start:frame_start + frame_len]
codec_type = frame[0]
codec_data = frame[1:]
result["frames"].append((codec_type, bytes(codec_data)))
pos = frame_start + frame_len
elif fmt == 0xC4:
# bin8: single frame
if len(buf) < 5:
return {"error": "bin8 too short"}
frame_len = buf[3]
if len(buf) < 4 + frame_len or frame_len < 2:
return {"error": "bin8 frame truncated"}
frame = buf[4:4 + frame_len]
codec_type = frame[0]
codec_data = frame[1:]
result["frames"].append((codec_type, bytes(codec_data)))
elif fmt == 0xC5:
# bin16: single frame
if len(buf) < 6:
return {"error": "bin16 too short"}
frame_len = (buf[3] << 8) | buf[4]
if len(buf) < 5 + frame_len or frame_len < 2:
return {"error": "bin16 frame truncated"}
frame = buf[5:5 + frame_len]
codec_type = frame[0]
codec_data = frame[1:]
result["frames"].append((codec_type, bytes(codec_data)))
return result
def parse_lxst_python_rx(wire_bytes):
"""
Simulate Python LXST LinkSource._packet() parsing.
This is the actual reference implementation logic from Network.py.
Returns list of (codec_type_int, raw_codec_data_bytes) tuples.
"""
unpacked = msgpack.unpackb(wire_bytes)
results = {"frames": [], "signals": []}
if not isinstance(unpacked, dict):
return {"error": "not a dict"}
if FIELD_FRAMES in unpacked:
frames = unpacked[FIELD_FRAMES]
if not isinstance(frames, list):
frames = [frames]
for frame in frames:
codec_type_byte = frame[0]
codec_data = frame[1:]
results["frames"].append((codec_type_byte, bytes(codec_data)))
if FIELD_SIGNALLING in unpacked:
signals = unpacked[FIELD_SIGNALLING]
if not isinstance(signals, list):
signals = [signals]
results["signals"] = signals
return results
def snr_db(original, reconstructed):
"""Calculate Signal-to-Noise Ratio in dB between original and reconstructed signals."""
# Align lengths
min_len = min(len(original), len(reconstructed))
orig = original[:min_len].astype(np.float64)
recon = reconstructed[:min_len].astype(np.float64)
signal_power = np.mean(orig ** 2)
noise_power = np.mean((orig - recon) ** 2)
if noise_power == 0:
return float("inf")
if signal_power == 0:
return 0.0
return 10 * math.log10(signal_power / noise_power)