Files
pyxis/lib/ble_interface/BLEReassembler.cpp
torlando-agent[bot] 70d4aa6be9 feat: graft pyxis onto upstream microReticulum 0.4.1
Repins microReticulum + microLXMF onto the upstream-0.4.1 graft and adapts
pyxis to the new src/microReticulum/ layout and 0.4.x APIs. The far-diverged
0.3.0 fork's Resource/Transport/Identity work is subsumed by upstream's
reimplementation; only the still-needed fixes ride on the pinned branches
(PKCS7/HMAC/X25519 crypto -- proven byte-identical to python RNS 1.3.1 --
Packet link-proof callback, Identity short-sig guard, and the bz2 layer +
decompress-on-receive in Resource::assemble()).

Consumer-side changes:
- platformio.ini: pin microReticulum @2f21fee (pyxis-fixes-on-0.4.1) and
  microLXMF @33760d0 (chore/microreticulum-0.4.1-layout); bump microStore
  ceea8f5 -> c5fb69d (0.4.x requires the new BasicFileStore::init API);
  -std=gnu++11 -> gnu++17 (upstream requires C++17).
- Namespace all microReticulum includes (angle + quote) to <microReticulum/...>
  for the relocated layout; shim-local Utilities/Stream.h|Print.h preserved.
- Interface::send_outgoing now returns bool: update TCP/BLE/SX1262/Auto
  overrides with correct success/failure returns.
- SDArchiveFileSystem::init(bool reformatOnFail=true) to match new microStore.
- Static Transport::get_path_table() -> path_table(); instance getter unchanged.
- Remove duplicate shim Cryptography/BZ2 (microReticulum provides it now; keep
  lib/libbz2 as the ESP32 bzlib provider).
- patch_littlefs_paths.py: normalize microStore's LittleFS adapter paths to a
  leading "/" -- ESP32 Arduino LittleFS rejects "./"-prefixed paths, which
  silently broke the path store (no peer paths learned, all messaging blocked).

Validated on T-Deck Plus: builds (RAM 27.5% / Flash 77.7%), boots stable
(no WDT/panic), and a full on-device LXMF e2e (DIRECT + OPPORTUNISTIC +
bz2-compressed-Resource receive) passes 5/5.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01UWZuYkHBRqNb6BZHV8sTG5
2026-06-19 15:49:44 -04:00

327 lines
10 KiB
C++

/**
* @file BLEReassembler.cpp
* @brief BLE-Reticulum Protocol v2.2 fragment reassembler implementation
*
* Uses fixed-size pools instead of STL containers to eliminate heap fragmentation.
*/
#include "BLEReassembler.h"
#include <microReticulum/Log.h>
namespace RNS { namespace BLE {
BLEReassembler::BLEReassembler() {
// Default timeout from protocol spec
_timeout_seconds = Timing::REASSEMBLY_TIMEOUT;
// Initialize pool
for (size_t i = 0; i < MAX_PENDING_REASSEMBLIES; i++) {
_pending_pool[i].clear();
}
}
BLEReassembler::PendingReassemblySlot* BLEReassembler::findSlot(const Bytes& peer_identity) {
for (size_t i = 0; i < MAX_PENDING_REASSEMBLIES; i++) {
if (_pending_pool[i].in_use && _pending_pool[i].transfer_id == peer_identity) {
return &_pending_pool[i];
}
}
return nullptr;
}
const BLEReassembler::PendingReassemblySlot* BLEReassembler::findSlot(const Bytes& peer_identity) const {
for (size_t i = 0; i < MAX_PENDING_REASSEMBLIES; i++) {
if (_pending_pool[i].in_use && _pending_pool[i].transfer_id == peer_identity) {
return &_pending_pool[i];
}
}
return nullptr;
}
BLEReassembler::PendingReassemblySlot* BLEReassembler::allocateSlot(const Bytes& peer_identity) {
// First check if slot already exists for this peer
PendingReassemblySlot* existing = findSlot(peer_identity);
if (existing) {
return existing;
}
// Find a free slot
for (size_t i = 0; i < MAX_PENDING_REASSEMBLIES; i++) {
if (!_pending_pool[i].in_use) {
_pending_pool[i].in_use = true;
_pending_pool[i].transfer_id = peer_identity;
return &_pending_pool[i];
}
}
return nullptr; // Pool is full
}
size_t BLEReassembler::pendingCount() const {
size_t count = 0;
for (size_t i = 0; i < MAX_PENDING_REASSEMBLIES; i++) {
if (_pending_pool[i].in_use) {
count++;
}
}
return count;
}
void BLEReassembler::setReassemblyCallback(ReassemblyCallback callback) {
_reassembly_callback = callback;
}
void BLEReassembler::setTimeoutCallback(TimeoutCallback callback) {
_timeout_callback = callback;
}
void BLEReassembler::setTimeout(double timeout_seconds) {
_timeout_seconds = timeout_seconds;
}
bool BLEReassembler::processFragment(const Bytes& peer_identity, const Bytes& fragment) {
// Validate fragment
if (!BLEFragmenter::isValidFragment(fragment)) {
TRACE("BLEReassembler: Invalid fragment header");
return false;
}
// Parse header
Fragment::Type type;
uint16_t sequence;
uint16_t total_fragments;
if (!BLEFragmenter::parseHeader(fragment, type, sequence, total_fragments)) {
TRACE("BLEReassembler: Failed to parse fragment header");
return false;
}
double now = Utilities::OS::time();
// Handle START fragment - begins a new reassembly
if (type == Fragment::START) {
// Clear any existing incomplete reassembly for this peer
PendingReassemblySlot* existing = findSlot(peer_identity);
if (existing) {
TRACE("BLEReassembler: Discarding incomplete reassembly for new START");
existing->clear();
}
// Start new reassembly
if (!startReassembly(peer_identity, total_fragments)) {
return false;
}
}
// Look up pending reassembly
PendingReassemblySlot* slot = findSlot(peer_identity);
if (!slot) {
// No pending reassembly and this isn't a START
if (type != Fragment::START) {
// For single-fragment packets (type=END, total=1, seq=0), start immediately
if (type == Fragment::END && total_fragments == 1 && sequence == 0) {
if (!startReassembly(peer_identity, total_fragments)) {
return false;
}
slot = findSlot(peer_identity);
} else {
TRACE("BLEReassembler: Received fragment without START, discarding");
return false;
}
} else {
slot = findSlot(peer_identity);
}
}
if (!slot) {
ERROR("BLEReassembler: Failed to find/create reassembly session");
return false;
}
PendingReassembly& reassembly = slot->reassembly;
// Validate total_fragments matches
if (total_fragments != reassembly.total_fragments) {
char buf[80];
snprintf(buf, sizeof(buf), "BLEReassembler: Fragment total mismatch, expected %u got %u",
reassembly.total_fragments, total_fragments);
TRACE(buf);
return false;
}
// Validate sequence is in range
if (sequence >= reassembly.total_fragments) {
char buf[64];
snprintf(buf, sizeof(buf), "BLEReassembler: Sequence out of range: %u", sequence);
TRACE(buf);
return false;
}
// Check for duplicate
if (reassembly.fragments[sequence].received) {
char buf[64];
snprintf(buf, sizeof(buf), "BLEReassembler: Duplicate fragment %u", sequence);
TRACE(buf);
// Still update last_activity to keep session alive
reassembly.last_activity = now;
return true; // Not an error, just duplicate
}
// Store fragment payload into fixed-size buffer
Bytes payload = BLEFragmenter::extractPayload(fragment);
if (payload.size() > MAX_FRAGMENT_PAYLOAD_SIZE) {
char buf[80];
snprintf(buf, sizeof(buf), "BLEReassembler: Fragment payload too large: %zu > %zu",
payload.size(), MAX_FRAGMENT_PAYLOAD_SIZE);
WARNING(buf);
return false;
}
memcpy(reassembly.fragments[sequence].data, payload.data(), payload.size());
reassembly.fragments[sequence].data_size = payload.size();
reassembly.fragments[sequence].received = true;
reassembly.received_count++;
reassembly.last_activity = now;
{
char buf[64];
snprintf(buf, sizeof(buf), "BLEReassembler: Received fragment %u/%u", sequence + 1, reassembly.total_fragments);
TRACE(buf);
}
// Check if complete
if (reassembly.received_count == reassembly.total_fragments) {
// Assemble complete packet
Bytes complete_packet = assembleFragments(reassembly);
{
char buf[64];
snprintf(buf, sizeof(buf), "BLEReassembler: Completed reassembly, %zu bytes", complete_packet.size());
TRACE(buf);
}
// Remove from pending before callback (callback might trigger new data)
Bytes identity_copy = reassembly.peer_identity;
slot->clear();
// Invoke callback
if (_reassembly_callback) {
_reassembly_callback(identity_copy, complete_packet);
}
}
return true;
}
void BLEReassembler::checkTimeouts() {
double now = Utilities::OS::time();
// Find and clean up expired reassemblies
for (size_t i = 0; i < MAX_PENDING_REASSEMBLIES; i++) {
if (!_pending_pool[i].in_use) {
continue;
}
PendingReassembly& reassembly = _pending_pool[i].reassembly;
double age = now - reassembly.started_at;
if (age > _timeout_seconds) {
{
char buf[80];
snprintf(buf, sizeof(buf), "BLEReassembler: Timeout waiting for fragments, received %u/%u",
reassembly.received_count, reassembly.total_fragments);
WARNING(buf);
}
// Copy identity before clearing
Bytes peer_identity = _pending_pool[i].transfer_id;
// Clear the slot
_pending_pool[i].clear();
// Invoke timeout callback after clearing (callback might start new reassembly)
if (_timeout_callback) {
_timeout_callback(peer_identity, "Reassembly timeout");
}
}
}
}
void BLEReassembler::clearForPeer(const Bytes& peer_identity) {
PendingReassemblySlot* slot = findSlot(peer_identity);
if (slot) {
TRACE("BLEReassembler: Clearing pending reassembly for peer");
slot->clear();
}
}
void BLEReassembler::clearAll() {
char buf[64];
snprintf(buf, sizeof(buf), "BLEReassembler: Clearing all pending reassemblies (%zu sessions)", pendingCount());
TRACE(buf);
for (size_t i = 0; i < MAX_PENDING_REASSEMBLIES; i++) {
_pending_pool[i].clear();
}
}
bool BLEReassembler::hasPending(const Bytes& peer_identity) const {
return findSlot(peer_identity) != nullptr;
}
bool BLEReassembler::startReassembly(const Bytes& peer_identity, uint16_t total_fragments) {
// Validate fragment count fits in fixed-size array
if (total_fragments > MAX_FRAGMENTS_PER_REASSEMBLY) {
char buf[80];
snprintf(buf, sizeof(buf), "BLEReassembler: Too many fragments: %u > %zu",
total_fragments, MAX_FRAGMENTS_PER_REASSEMBLY);
WARNING(buf);
return false;
}
// Allocate a slot (reuses existing or finds free)
PendingReassemblySlot* slot = allocateSlot(peer_identity);
if (!slot) {
WARNING("BLEReassembler: Pool full, cannot start new reassembly");
return false;
}
double now = Utilities::OS::time();
// Initialize the reassembly state
PendingReassembly& reassembly = slot->reassembly;
reassembly.clear(); // Clear any old data
reassembly.peer_identity = peer_identity;
reassembly.total_fragments = total_fragments;
reassembly.received_count = 0;
reassembly.started_at = now;
reassembly.last_activity = now;
char buf[64];
snprintf(buf, sizeof(buf), "BLEReassembler: Starting reassembly for %u fragments", total_fragments);
TRACE(buf);
return true;
}
Bytes BLEReassembler::assembleFragments(const PendingReassembly& reassembly) {
// Calculate total size
size_t total_size = 0;
for (size_t i = 0; i < reassembly.total_fragments; i++) {
total_size += reassembly.fragments[i].data_size;
}
// Allocate result buffer
Bytes result(total_size);
uint8_t* ptr = result.writable(total_size);
result.resize(total_size);
// Concatenate fragments in order
size_t offset = 0;
for (size_t i = 0; i < reassembly.total_fragments; i++) {
const FragmentInfo& frag = reassembly.fragments[i];
if (frag.data_size > 0) {
memcpy(ptr + offset, frag.data, frag.data_size);
offset += frag.data_size;
}
}
return result;
}
}} // namespace RNS::BLE