Files
pyxis/tests/interop/test_wire_format.py
torlando-tech 6e47cb808b Increase playback buffer for jitter-free LXST RX audio
PCM_RING_FRAMES 16→50 (320ms→1000ms capacity) and
PREBUFFER_FRAMES 3→15 (60ms→300ms prebuffer) to match
LXST-kt's buffering strategy. Interop test suite confirms
zero underruns with ±100ms jitter at these settings.

Also adds tests/interop/ with 48 Python tests verifying
wire format, codec round-trip, and pipeline compatibility
between Pyxis, Python LXST, and LXST-kt implementations.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 10:57:07 -05:00

347 lines
14 KiB
Python

"""
Wire format interop tests.
Verifies that packets constructed by Pyxis can be parsed by Python LXST
(and vice versa), ensuring msgpack encoding/decoding compatibility.
Tests cover:
- Single audio frame packets (bin8)
- Batched audio frame packets (Pyxis TX style)
- Signalling packets (fixint, uint8, uint16)
- Profile negotiation signals
- Edge cases (empty, max size, etc.)
"""
import struct
import numpy as np
import pycodec2
import pytest
from conftest import (
CODEC_CODEC2, CODEC_OPUS, FIELD_FRAMES, FIELD_SIGNALLING,
MODE_HEADERS, HEADER_MODES,
STATUS_AVAILABLE, STATUS_RINGING, STATUS_ESTABLISHED,
PREFERRED_PROFILE, PROFILE_LBW, PROFILE_MQ,
build_pyxis_audio_packet, build_columba_audio_packet,
build_signal_packet, parse_pyxis_rx, parse_lxst_python_rx,
encode_codec2_subframes, batch_subframes_pyxis_style,
)
from RNS.vendor import umsgpack as msgpack
class TestPyxisTxFormat:
"""Test that Pyxis-format packets can be parsed by Python LXST."""
def test_single_subframe_packet(self, codec2_3200, sine_8khz_200ms):
"""A single 20ms Codec2 sub-frame as bin8 — simplest case."""
spf = codec2_3200.samples_per_frame() # 160
pcm = sine_8khz_200ms[:spf] # first 160 samples
encoded = codec2_3200.encode(pcm) # 8 bytes raw
# Build batch with 1 frame: [mode_header(0x06)] + [8 bytes]
batch = bytes([MODE_HEADERS[3200]]) + encoded
wire = build_pyxis_audio_packet(batch)
# Parse with Python LXST logic
result = parse_lxst_python_rx(wire)
assert len(result["frames"]) == 1
codec_type, codec_data = result["frames"][0]
assert codec_type == CODEC_CODEC2
assert codec_data[0] == MODE_HEADERS[3200] # mode header
assert len(codec_data) == 1 + 8 # mode_header + 1 sub-frame
def test_10_subframe_batch_lbw(self, codec2_3200, sine_8khz_200ms):
"""
10 sub-frames batched (200ms = 1 LXST LBW "frame").
This is the most common Pyxis TX packet size.
"""
subframes = encode_codec2_subframes(codec2_3200, sine_8khz_200ms[:1600])
assert len(subframes) == 10
batch = batch_subframes_pyxis_style(subframes, MODE_HEADERS[3200])
wire = build_pyxis_audio_packet(batch)
# Parse with Python LXST
result = parse_lxst_python_rx(wire)
assert len(result["frames"]) == 1 # single bin8, not array
codec_type, codec_data = result["frames"][0]
assert codec_type == CODEC_CODEC2
# Verify codec_data structure: [mode_header] + [10 * 8 bytes]
assert codec_data[0] == MODE_HEADERS[3200]
assert len(codec_data) == 1 + 10 * 8 # 81 bytes
def test_25_subframe_batch(self, codec2_3200, sine_8khz_1s):
"""25 sub-frames batched (500ms) — Pyxis adaptive batching max."""
subframes = encode_codec2_subframes(codec2_3200, sine_8khz_1s[:4000])
assert len(subframes) == 25
batch = batch_subframes_pyxis_style(subframes, MODE_HEADERS[3200])
wire = build_pyxis_audio_packet(batch)
result = parse_lxst_python_rx(wire)
assert len(result["frames"]) == 1
codec_type, codec_data = result["frames"][0]
assert codec_type == CODEC_CODEC2
assert len(codec_data) == 1 + 25 * 8 # 201 bytes
def test_pyxis_packet_also_parseable_by_pyxis_rx(self, codec2_3200, sine_8khz_200ms):
"""Pyxis packets should also be parseable by our own RX parser."""
subframes = encode_codec2_subframes(codec2_3200, sine_8khz_200ms[:1600])
batch = batch_subframes_pyxis_style(subframes, MODE_HEADERS[3200])
wire = build_pyxis_audio_packet(batch)
result = parse_pyxis_rx(wire)
assert len(result["frames"]) == 1
codec_type, codec_data = result["frames"][0]
assert codec_type == CODEC_CODEC2
assert len(codec_data) == 1 + 10 * 8
def test_lxst_python_can_decode_pyxis_batch(self, codec2_3200, lxst_codec2_3200, sine_8khz_200ms):
"""
Full decode test: Python LXST Codec2 can decode a Pyxis batched packet.
This is THE critical interop test — it verifies that the codec data
extracted from a Pyxis wire packet can be decoded by Python LXST's
Codec2.decode() method.
"""
# Encode 10 sub-frames with pycodec2
pcm = sine_8khz_200ms[:1600]
subframes = encode_codec2_subframes(codec2_3200, pcm)
batch = batch_subframes_pyxis_style(subframes, MODE_HEADERS[3200])
# Build wire packet and parse
wire = build_pyxis_audio_packet(batch)
result = parse_lxst_python_rx(wire)
codec_type, codec_data = result["frames"][0]
# Decode with Python LXST Codec2
# codec_data = [mode_header] + [10 * 8 bytes raw codec2]
decoded = lxst_codec2_3200.decode(codec_data)
assert decoded.shape[0] == 1600 # 10 * 160 samples
assert decoded.shape[1] == 1 # mono
# Verify audio is not silence
max_amp = np.max(np.abs(decoded))
assert max_amp > 0.01, f"Decoded audio is near-silence: max_amp={max_amp}"
class TestColumbaTxFormat:
"""Test that Columba/LXST-kt format packets can be parsed by Pyxis."""
def test_single_lbw_frame(self, codec2_3200, sine_8khz_200ms):
"""
Columba LBW: 200ms = 1600 samples → Codec2 encode → 81 bytes.
Wire: {0x01: bin8([0x02, 0x06, 80 bytes])} = 86 bytes total.
"""
pcm = sine_8khz_200ms[:1600]
# Encode like LXST-kt Codec2.encode(): all 1600 samples at once
spf = codec2_3200.samples_per_frame()
bpf = codec2_3200.bytes_per_frame()
n_frames = len(pcm) // spf
encoded = bytes([MODE_HEADERS[3200]])
for i in range(n_frames):
chunk = pcm[i * spf:(i + 1) * spf]
encoded += codec2_3200.encode(chunk)
assert len(encoded) == 1 + 10 * 8 # 81 bytes
# Build Columba wire packet
wire = build_columba_audio_packet(CODEC_CODEC2, encoded)
# Parse with Pyxis RX
result = parse_pyxis_rx(wire)
assert "error" not in result
assert len(result["frames"]) == 1
codec_type, codec_data = result["frames"][0]
assert codec_type == CODEC_CODEC2
assert codec_data[0] == MODE_HEADERS[3200]
assert len(codec_data) == 81
def test_columba_packet_decoded_by_pyxis_codec(self, codec2_3200, sine_8khz_200ms):
"""
Full decode: Pyxis Codec2 decode logic can handle Columba's frame.
Simulates what i2s_playback.cpp writeEncodedPacket() does:
calls codec_->decode(codec_data, len) which reads mode header,
then decodes N sub-frames.
"""
pcm = sine_8khz_200ms[:1600]
spf = codec2_3200.samples_per_frame()
bpf = codec2_3200.bytes_per_frame()
n_frames = len(pcm) // spf
# Encode (Columba style)
mode_header = bytes([MODE_HEADERS[3200]])
raw_encoded = b""
for i in range(n_frames):
chunk = pcm[i * spf:(i + 1) * spf]
raw_encoded += codec2_3200.encode(chunk)
encoded = mode_header + raw_encoded
# Build wire packet and parse like Pyxis
wire = build_columba_audio_packet(CODEC_CODEC2, encoded)
result = parse_pyxis_rx(wire)
codec_type, codec_data = result["frames"][0]
# Decode like Pyxis codec_wrapper.cpp:
# codec_data[0] = mode header, codec_data[1:] = raw sub-frames
header = codec_data[0]
assert header == MODE_HEADERS[3200]
raw_data = codec_data[1:]
assert len(raw_data) == n_frames * bpf
# Decode each sub-frame (simulating Codec2Wrapper::decode)
# Use a fresh codec2 instance (simulating the decoder on Pyxis)
decoder = pycodec2.Codec2(3200)
decoded_pcm = np.zeros(n_frames * spf, dtype=np.int16)
for i in range(n_frames):
frame_data = raw_data[i * bpf:(i + 1) * bpf]
decoded = decoder.decode(frame_data)
decoded_pcm[i * spf:(i + 1) * spf] = decoded
# Verify non-silence
assert np.max(np.abs(decoded_pcm)) > 100
def test_columba_also_parseable_by_lxst_python(self, codec2_3200, sine_8khz_200ms):
"""Columba packets should also be parseable by Python LXST (self-interop)."""
pcm = sine_8khz_200ms[:1600]
spf = codec2_3200.samples_per_frame()
n_frames = len(pcm) // spf
encoded = bytes([MODE_HEADERS[3200]])
for i in range(n_frames):
chunk = pcm[i * spf:(i + 1) * spf]
encoded += codec2_3200.encode(chunk)
wire = build_columba_audio_packet(CODEC_CODEC2, encoded)
result = parse_lxst_python_rx(wire)
assert len(result["frames"]) == 1
codec_type, codec_data = result["frames"][0]
assert codec_type == CODEC_CODEC2
class TestSignallingFormat:
"""Test signalling packet format compatibility."""
def test_fixint_signal(self):
"""Signals 0x00-0x06 use fixint encoding (single byte)."""
for signal in [STATUS_AVAILABLE, STATUS_RINGING, STATUS_ESTABLISHED]:
wire = build_signal_packet(signal)
# Both parsers should decode
pyxis_result = parse_pyxis_rx(wire)
lxst_result = parse_lxst_python_rx(wire)
assert pyxis_result["signals"] == [signal]
assert lxst_result["signals"] == [signal]
def test_uint8_signal(self):
"""Signals 128-255 use uint8 encoding (0xCC XX)."""
signal = PREFERRED_PROFILE # 0xFF
wire = build_signal_packet(signal)
pyxis_result = parse_pyxis_rx(wire)
lxst_result = parse_lxst_python_rx(wire)
assert pyxis_result["signals"] == [signal]
assert lxst_result["signals"] == [signal]
def test_profile_negotiation_signal(self):
"""PREFERRED_PROFILE + profile_id uses uint16 encoding (0xCD XX XX)."""
signal = PREFERRED_PROFILE + PROFILE_LBW # 0xFF + 0x30 = 0x12F (303)
wire = build_signal_packet(signal)
pyxis_result = parse_pyxis_rx(wire)
lxst_result = parse_lxst_python_rx(wire)
assert pyxis_result["signals"] == [signal]
assert lxst_result["signals"] == [signal]
def test_pyxis_manual_signal_construction(self):
"""
Verify Pyxis call_send_signal() manual msgpack matches standard msgpack.
Pyxis constructs signal packets by hand (not using a msgpack library):
fixint: [0x81, 0x00, 0x91, signal]
uint8: [0x81, 0x00, 0x91, 0xCC, signal]
uint16: [0x81, 0x00, 0x91, 0xCD, hi, lo]
"""
# fixint range (0-127)
for signal in [0x00, 0x06, 0x7F]:
manual = bytes([0x81, 0x00, 0x91, signal])
standard = build_signal_packet(signal)
assert manual == standard, f"Mismatch for fixint signal 0x{signal:02x}"
# uint8 range (128-255) — only 0xFF used in practice
signal = 0xFF
manual = bytes([0x81, 0x00, 0x91, 0xCC, signal])
standard = build_signal_packet(signal)
assert manual == standard
# uint16 range (256+) — profile negotiation
signal = 0x12F # PREFERRED_PROFILE + LBW
hi = (signal >> 8) & 0xFF
lo = signal & 0xFF
manual = bytes([0x81, 0x00, 0x91, 0xCD, hi, lo])
standard = build_signal_packet(signal)
assert manual == standard
class TestWireFormatEdgeCases:
"""Edge cases and boundary conditions."""
def test_max_bin8_payload(self, codec2_3200, sine_8khz_1s):
"""Test maximum bin8 payload (255 bytes)."""
# 30 sub-frames: [mode(1)] + [30*8] = 241 bytes + codec_type(1) = 242 bytes
subframes = encode_codec2_subframes(codec2_3200, sine_8khz_1s[:4800])
batch = batch_subframes_pyxis_style(subframes[:30], MODE_HEADERS[3200])
wire = build_pyxis_audio_packet(batch)
result = parse_lxst_python_rx(wire)
assert len(result["frames"]) == 1
codec_type, codec_data = result["frames"][0]
assert codec_type == CODEC_CODEC2
# mode_header + 30 * 8 bytes = 241 bytes
assert len(codec_data) == 1 + 30 * 8
# Also parseable by Pyxis
result2 = parse_pyxis_rx(wire)
assert len(result2["frames"]) == 1
def test_codec2_1600_mode(self, codec2_1600, sine_8khz_200ms):
"""Test VLBW profile with Codec2 1600 bps."""
spf = codec2_1600.samples_per_frame()
bpf = codec2_1600.bytes_per_frame()
pcm = sine_8khz_200ms[:spf * 8] # 8 sub-frames
subframes = encode_codec2_subframes(codec2_1600, pcm)
batch = batch_subframes_pyxis_style(subframes, MODE_HEADERS[1600])
wire = build_pyxis_audio_packet(batch)
result = parse_lxst_python_rx(wire)
assert len(result["frames"]) == 1
codec_type, codec_data = result["frames"][0]
assert codec_type == CODEC_CODEC2
assert codec_data[0] == MODE_HEADERS[1600] # 0x04
def test_raw_bytes_roundtrip(self):
"""Verify raw wire bytes match between manual and msgpack construction."""
# Build a simple packet manually (as Pyxis does in call_send_audio_batch)
batch_data = bytes([0x06]) + bytes(8) # mode_header + 8 zero bytes
codec_type = CODEC_CODEC2
total_len = 1 + len(batch_data) # codec_type + batch
manual_wire = bytes([
0x81, # fixmap(1)
0x01, # key: FIELD_FRAMES
0xC4, # bin8
total_len, # length
codec_type, # 0x02
]) + batch_data
standard_wire = build_pyxis_audio_packet(batch_data)
assert manual_wire == standard_wire