mirror of
https://github.com/torlando-tech/pyxis.git
synced 2026-03-30 13:45:38 +00:00
PCM_RING_FRAMES 16→50 (320ms→1000ms capacity) and PREBUFFER_FRAMES 3→15 (60ms→300ms prebuffer) to match LXST-kt's buffering strategy. Interop test suite confirms zero underruns with ±100ms jitter at these settings. Also adds tests/interop/ with 48 Python tests verifying wire format, codec round-trip, and pipeline compatibility between Pyxis, Python LXST, and LXST-kt implementations. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
364 lines
11 KiB
Python
364 lines
11 KiB
Python
"""
|
|
Shared fixtures for LXST interop tests.
|
|
|
|
These tests verify wire-format and codec compatibility between:
|
|
- Pyxis (ESP32 C++ implementation)
|
|
- Python LXST reference implementation
|
|
- LXST-kt (Kotlin/Android) — same wire format as Python LXST
|
|
"""
|
|
|
|
import sys
|
|
import math
|
|
import struct
|
|
import numpy as np
|
|
import pytest
|
|
|
|
# Add Python LXST to path
|
|
sys.path.insert(0, "$HOME/repos/public/LXST")
|
|
|
|
import pycodec2
|
|
from RNS.vendor import umsgpack as msgpack
|
|
|
|
# ── Constants matching all three implementations ──
|
|
|
|
# Msgpack field keys (Network.py, UIManager.cpp, PacketRouter.kt)
|
|
FIELD_SIGNALLING = 0x00
|
|
FIELD_FRAMES = 0x01
|
|
|
|
# Codec type header bytes (Codecs/__init__.py, Packetizer.kt, UIManager.h)
|
|
CODEC_NULL = 0xFF
|
|
CODEC_RAW = 0x00
|
|
CODEC_OPUS = 0x01
|
|
CODEC_CODEC2 = 0x02
|
|
|
|
# Codec2 mode headers (Codec2.py, Codec2.kt, codec_wrapper.cpp)
|
|
MODE_HEADERS = {
|
|
700: 0x00, 1200: 0x01, 1300: 0x02, 1400: 0x03,
|
|
1600: 0x04, 2400: 0x05, 3200: 0x06,
|
|
}
|
|
HEADER_MODES = {v: k for k, v in MODE_HEADERS.items()}
|
|
|
|
# Codec2 library mode mapping (matches codec_wrapper.cpp headerToLibraryMode)
|
|
MODE_TO_LIBRARY = {
|
|
3200: 0, 2400: 1, 1600: 2, 1400: 3, 1300: 4, 1200: 5, 700: 8,
|
|
}
|
|
|
|
# Signalling constants (UIManager.h, Signalling in LXST-kt)
|
|
STATUS_BUSY = 0x00
|
|
STATUS_REJECTED = 0x01
|
|
STATUS_CALLING = 0x02
|
|
STATUS_AVAILABLE = 0x03
|
|
STATUS_RINGING = 0x04
|
|
STATUS_CONNECTING = 0x05
|
|
STATUS_ESTABLISHED = 0x06
|
|
PREFERRED_PROFILE = 0xFF
|
|
|
|
# Profile IDs (Profile.kt)
|
|
PROFILE_ULBW = 0x10
|
|
PROFILE_VLBW = 0x20
|
|
PROFILE_LBW = 0x30
|
|
PROFILE_MQ = 0x40
|
|
|
|
|
|
@pytest.fixture
|
|
def codec2_3200():
|
|
"""Create a pycodec2 instance for 3200 bps mode."""
|
|
return pycodec2.Codec2(3200)
|
|
|
|
|
|
@pytest.fixture
|
|
def codec2_1600():
|
|
"""Create a pycodec2 instance for 1600 bps mode."""
|
|
return pycodec2.Codec2(1600)
|
|
|
|
|
|
@pytest.fixture
|
|
def lxst_codec2_3200():
|
|
"""Create a Python LXST Codec2 instance for 3200 bps."""
|
|
from LXST.Codecs.Codec2 import Codec2
|
|
return Codec2(mode=3200)
|
|
|
|
|
|
@pytest.fixture
|
|
def lxst_codec2_1600():
|
|
"""Create a Python LXST Codec2 instance for 1600 bps."""
|
|
from LXST.Codecs.Codec2 import Codec2
|
|
return Codec2(mode=1600)
|
|
|
|
|
|
@pytest.fixture
|
|
def sine_8khz_200ms():
|
|
"""Generate 200ms of 440Hz sine wave at 8kHz sample rate (int16)."""
|
|
sr = 8000
|
|
duration = 0.2
|
|
t = np.arange(int(sr * duration)) / sr
|
|
samples = (np.sin(2 * np.pi * 440 * t) * 16000).astype(np.int16)
|
|
return samples
|
|
|
|
|
|
@pytest.fixture
|
|
def sine_8khz_1s():
|
|
"""Generate 1 second of 440Hz sine wave at 8kHz sample rate (int16)."""
|
|
sr = 8000
|
|
duration = 1.0
|
|
t = np.arange(int(sr * duration)) / sr
|
|
samples = (np.sin(2 * np.pi * 440 * t) * 16000).astype(np.int16)
|
|
return samples
|
|
|
|
|
|
def codec2_mode_header(codec2):
|
|
"""
|
|
Get the wire-format mode header byte for a pycodec2 instance.
|
|
pycodec2 doesn't expose the mode directly, so we infer from bytes_per_frame
|
|
and samples_per_frame.
|
|
"""
|
|
spf = codec2.samples_per_frame()
|
|
bpf = codec2.bytes_per_frame()
|
|
# Map (spf, bpf) to mode header
|
|
KNOWN_PARAMS = {
|
|
(160, 8): 0x06, # 3200
|
|
(160, 8): 0x06, # 3200
|
|
(320, 8): 0x04, # 1600
|
|
(320, 7): 0x03, # 1400
|
|
(320, 7): 0x02, # 1300 (same bpf as 1400 but different params)
|
|
(320, 6): 0x01, # 1200
|
|
(320, 4): 0x00, # 700C
|
|
}
|
|
# This is ambiguous for some modes. Use a different approach:
|
|
# Try each mode and check SPF match
|
|
for mode, header in MODE_HEADERS.items():
|
|
lib_mode = MODE_TO_LIBRARY[mode]
|
|
test_codec = pycodec2.Codec2(mode)
|
|
if test_codec.samples_per_frame() == spf and test_codec.bytes_per_frame() == bpf:
|
|
# Could be multiple matches (1300 vs 1400 both have spf=320, bpf=7)
|
|
# Return first match — caller should ensure they pass the right codec
|
|
return header
|
|
raise ValueError(f"Unknown codec2 params: spf={spf} bpf={bpf}")
|
|
|
|
|
|
def encode_codec2_subframes(codec2, pcm_int16, mode_header=None):
|
|
"""
|
|
Encode PCM samples into individual Codec2 sub-frames.
|
|
|
|
Simulates Pyxis i2s_capture.cpp + codec_wrapper.cpp:
|
|
Each encode produces [mode_header(1)] + [raw_encoded(N)] bytes.
|
|
|
|
pycodec2.encode() returns only raw bytes, so we prepend the mode header
|
|
to match what Pyxis codec_wrapper.cpp does.
|
|
|
|
Returns list of (1 + bytes_per_frame)-byte encoded frames.
|
|
"""
|
|
spf = codec2.samples_per_frame()
|
|
bpf = codec2.bytes_per_frame()
|
|
n_frames = len(pcm_int16) // spf
|
|
|
|
if mode_header is None:
|
|
mode_header = codec2_mode_header(codec2)
|
|
|
|
frames = []
|
|
for i in range(n_frames):
|
|
chunk = pcm_int16[i * spf:(i + 1) * spf]
|
|
raw_encoded = codec2.encode(chunk)
|
|
# Prepend mode header (matches Pyxis codec_wrapper.cpp encode())
|
|
frame = bytes([mode_header]) + raw_encoded
|
|
frames.append(frame)
|
|
return frames
|
|
|
|
|
|
def batch_subframes_pyxis_style(subframes, mode_header):
|
|
"""
|
|
Batch multiple Codec2 sub-frames into Pyxis wire format.
|
|
|
|
Simulates UIManager.cpp call_send_audio_batch TX batching:
|
|
- First frame: keep [mode_header] + [data]
|
|
- Subsequent frames: strip mode header, append raw data only
|
|
- Result: [mode_header(1)] + [N * bytes_per_frame]
|
|
|
|
Returns the batch_data bytes (without codec_type prefix).
|
|
"""
|
|
if not subframes:
|
|
return b""
|
|
|
|
# First frame keeps header + data
|
|
batch = bytearray(subframes[0])
|
|
|
|
# Subsequent frames: strip 1-byte mode header
|
|
for frame in subframes[1:]:
|
|
batch.extend(frame[1:]) # skip mode header byte
|
|
|
|
return bytes(batch)
|
|
|
|
|
|
def build_pyxis_audio_packet(batch_data):
|
|
"""
|
|
Build a complete Pyxis-format wire packet for audio.
|
|
|
|
Simulates UIManager.cpp call_send_audio_batch:
|
|
{0x01: bin8([codec_type(0x02)] + batch_data)}
|
|
|
|
Where batch_data = [mode_header] + [N * raw_codec2_bytes]
|
|
"""
|
|
# Prepend codec type byte
|
|
frame_bytes = bytes([CODEC_CODEC2]) + batch_data
|
|
packet_data = {FIELD_FRAMES: frame_bytes}
|
|
return msgpack.packb(packet_data)
|
|
|
|
|
|
def build_columba_audio_packet(codec_type_byte, encoded_frame):
|
|
"""
|
|
Build a Columba/LXST-kt format wire packet for audio.
|
|
|
|
Simulates Packetizer.kt + Python LXST Packetizer:
|
|
{0x01: bytes([codec_type] + encoded_frame)}
|
|
|
|
Where encoded_frame = [mode_header] + [N * raw_codec2_bytes]
|
|
"""
|
|
frame_bytes = bytes([codec_type_byte]) + encoded_frame
|
|
packet_data = {FIELD_FRAMES: frame_bytes}
|
|
return msgpack.packb(packet_data)
|
|
|
|
|
|
def build_signal_packet(signal_value):
|
|
"""
|
|
Build a signalling packet matching all implementations.
|
|
|
|
Format: {0x00: [signal_value]}
|
|
"""
|
|
packet_data = {FIELD_SIGNALLING: [signal_value]}
|
|
return msgpack.packb(packet_data)
|
|
|
|
|
|
def parse_pyxis_rx(wire_bytes):
|
|
"""
|
|
Simulate Pyxis call_on_packet() parsing.
|
|
|
|
Parses the raw msgpack bytes exactly as UIManager.cpp does:
|
|
- Check for fixmap(1): 0x81
|
|
- Field 0x00 = signalling, field 0x01 = audio
|
|
- Audio: bin8/bin16 → single frame, fixarray → batched frames
|
|
- Returns list of (codec_type, codec_data) tuples for audio frames
|
|
- Returns list of signal values for signalling packets
|
|
"""
|
|
buf = wire_bytes
|
|
if len(buf) < 4:
|
|
return {"error": "packet too short"}
|
|
|
|
if buf[0] != 0x81:
|
|
return {"error": f"expected fixmap(1), got 0x{buf[0]:02x}"}
|
|
|
|
field = buf[1]
|
|
result = {"field": field, "frames": [], "signals": []}
|
|
|
|
if field == FIELD_SIGNALLING:
|
|
# {0x00: [signal]}
|
|
if buf[2] != 0x91:
|
|
return {"error": f"expected fixarray(1), got 0x{buf[2]:02x}"}
|
|
if buf[3] <= 0x7F:
|
|
result["signals"].append(buf[3])
|
|
elif buf[3] == 0xCC and len(buf) >= 5:
|
|
result["signals"].append(buf[4])
|
|
elif buf[3] == 0xCD and len(buf) >= 6:
|
|
result["signals"].append((buf[4] << 8) | buf[5])
|
|
else:
|
|
return {"error": f"unparseable signal 0x{buf[3]:02x}"}
|
|
|
|
elif field == FIELD_FRAMES:
|
|
fmt = buf[2]
|
|
if (fmt & 0xF0) == 0x90:
|
|
# fixarray: batched frames
|
|
array_len = fmt & 0x0F
|
|
pos = 3
|
|
for _ in range(array_len):
|
|
if pos >= len(buf):
|
|
break
|
|
if buf[pos] == 0xC4:
|
|
frame_len = buf[pos + 1]
|
|
frame_start = pos + 2
|
|
elif buf[pos] == 0xC5:
|
|
frame_len = (buf[pos + 1] << 8) | buf[pos + 2]
|
|
frame_start = pos + 3
|
|
else:
|
|
break
|
|
if frame_start + frame_len > len(buf) or frame_len < 2:
|
|
break
|
|
frame = buf[frame_start:frame_start + frame_len]
|
|
codec_type = frame[0]
|
|
codec_data = frame[1:]
|
|
result["frames"].append((codec_type, bytes(codec_data)))
|
|
pos = frame_start + frame_len
|
|
|
|
elif fmt == 0xC4:
|
|
# bin8: single frame
|
|
if len(buf) < 5:
|
|
return {"error": "bin8 too short"}
|
|
frame_len = buf[3]
|
|
if len(buf) < 4 + frame_len or frame_len < 2:
|
|
return {"error": "bin8 frame truncated"}
|
|
frame = buf[4:4 + frame_len]
|
|
codec_type = frame[0]
|
|
codec_data = frame[1:]
|
|
result["frames"].append((codec_type, bytes(codec_data)))
|
|
|
|
elif fmt == 0xC5:
|
|
# bin16: single frame
|
|
if len(buf) < 6:
|
|
return {"error": "bin16 too short"}
|
|
frame_len = (buf[3] << 8) | buf[4]
|
|
if len(buf) < 5 + frame_len or frame_len < 2:
|
|
return {"error": "bin16 frame truncated"}
|
|
frame = buf[5:5 + frame_len]
|
|
codec_type = frame[0]
|
|
codec_data = frame[1:]
|
|
result["frames"].append((codec_type, bytes(codec_data)))
|
|
|
|
return result
|
|
|
|
|
|
def parse_lxst_python_rx(wire_bytes):
|
|
"""
|
|
Simulate Python LXST LinkSource._packet() parsing.
|
|
|
|
This is the actual reference implementation logic from Network.py.
|
|
Returns list of (codec_type_int, raw_codec_data_bytes) tuples.
|
|
"""
|
|
unpacked = msgpack.unpackb(wire_bytes)
|
|
results = {"frames": [], "signals": []}
|
|
|
|
if not isinstance(unpacked, dict):
|
|
return {"error": "not a dict"}
|
|
|
|
if FIELD_FRAMES in unpacked:
|
|
frames = unpacked[FIELD_FRAMES]
|
|
if not isinstance(frames, list):
|
|
frames = [frames]
|
|
for frame in frames:
|
|
codec_type_byte = frame[0]
|
|
codec_data = frame[1:]
|
|
results["frames"].append((codec_type_byte, bytes(codec_data)))
|
|
|
|
if FIELD_SIGNALLING in unpacked:
|
|
signals = unpacked[FIELD_SIGNALLING]
|
|
if not isinstance(signals, list):
|
|
signals = [signals]
|
|
results["signals"] = signals
|
|
|
|
return results
|
|
|
|
|
|
def snr_db(original, reconstructed):
|
|
"""Calculate Signal-to-Noise Ratio in dB between original and reconstructed signals."""
|
|
# Align lengths
|
|
min_len = min(len(original), len(reconstructed))
|
|
orig = original[:min_len].astype(np.float64)
|
|
recon = reconstructed[:min_len].astype(np.float64)
|
|
|
|
signal_power = np.mean(orig ** 2)
|
|
noise_power = np.mean((orig - recon) ** 2)
|
|
|
|
if noise_power == 0:
|
|
return float("inf")
|
|
if signal_power == 0:
|
|
return 0.0
|
|
|
|
return 10 * math.log10(signal_power / noise_power)
|