Files
pyxis/lib/ble_interface/BLEReassembler.cpp
torlando-tech ac6ceca9f8 Initial commit: standalone Pyxis T-Deck firmware
Split T-Deck firmware from microReticulum examples/lxmf_tdeck/ into its
own repo. microReticulum is consumed as a git submodule dependency pinned
to feat/t-deck. All include paths updated from relative symlinks to bare
includes resolved via library build flags.

Both tdeck (NimBLE) and tdeck-bluedroid environments compile successfully.
Licensed under AGPLv3.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-06 19:48:33 -05:00

327 lines
10 KiB
C++

/**
* @file BLEReassembler.cpp
* @brief BLE-Reticulum Protocol v2.2 fragment reassembler implementation
*
* Uses fixed-size pools instead of STL containers to eliminate heap fragmentation.
*/
#include "BLEReassembler.h"
#include "Log.h"
namespace RNS { namespace BLE {
BLEReassembler::BLEReassembler() {
// Default timeout from protocol spec
_timeout_seconds = Timing::REASSEMBLY_TIMEOUT;
// Initialize pool
for (size_t i = 0; i < MAX_PENDING_REASSEMBLIES; i++) {
_pending_pool[i].clear();
}
}
BLEReassembler::PendingReassemblySlot* BLEReassembler::findSlot(const Bytes& peer_identity) {
for (size_t i = 0; i < MAX_PENDING_REASSEMBLIES; i++) {
if (_pending_pool[i].in_use && _pending_pool[i].transfer_id == peer_identity) {
return &_pending_pool[i];
}
}
return nullptr;
}
const BLEReassembler::PendingReassemblySlot* BLEReassembler::findSlot(const Bytes& peer_identity) const {
for (size_t i = 0; i < MAX_PENDING_REASSEMBLIES; i++) {
if (_pending_pool[i].in_use && _pending_pool[i].transfer_id == peer_identity) {
return &_pending_pool[i];
}
}
return nullptr;
}
BLEReassembler::PendingReassemblySlot* BLEReassembler::allocateSlot(const Bytes& peer_identity) {
// First check if slot already exists for this peer
PendingReassemblySlot* existing = findSlot(peer_identity);
if (existing) {
return existing;
}
// Find a free slot
for (size_t i = 0; i < MAX_PENDING_REASSEMBLIES; i++) {
if (!_pending_pool[i].in_use) {
_pending_pool[i].in_use = true;
_pending_pool[i].transfer_id = peer_identity;
return &_pending_pool[i];
}
}
return nullptr; // Pool is full
}
size_t BLEReassembler::pendingCount() const {
size_t count = 0;
for (size_t i = 0; i < MAX_PENDING_REASSEMBLIES; i++) {
if (_pending_pool[i].in_use) {
count++;
}
}
return count;
}
void BLEReassembler::setReassemblyCallback(ReassemblyCallback callback) {
_reassembly_callback = callback;
}
void BLEReassembler::setTimeoutCallback(TimeoutCallback callback) {
_timeout_callback = callback;
}
void BLEReassembler::setTimeout(double timeout_seconds) {
_timeout_seconds = timeout_seconds;
}
bool BLEReassembler::processFragment(const Bytes& peer_identity, const Bytes& fragment) {
// Validate fragment
if (!BLEFragmenter::isValidFragment(fragment)) {
TRACE("BLEReassembler: Invalid fragment header");
return false;
}
// Parse header
Fragment::Type type;
uint16_t sequence;
uint16_t total_fragments;
if (!BLEFragmenter::parseHeader(fragment, type, sequence, total_fragments)) {
TRACE("BLEReassembler: Failed to parse fragment header");
return false;
}
double now = Utilities::OS::time();
// Handle START fragment - begins a new reassembly
if (type == Fragment::START) {
// Clear any existing incomplete reassembly for this peer
PendingReassemblySlot* existing = findSlot(peer_identity);
if (existing) {
TRACE("BLEReassembler: Discarding incomplete reassembly for new START");
existing->clear();
}
// Start new reassembly
if (!startReassembly(peer_identity, total_fragments)) {
return false;
}
}
// Look up pending reassembly
PendingReassemblySlot* slot = findSlot(peer_identity);
if (!slot) {
// No pending reassembly and this isn't a START
if (type != Fragment::START) {
// For single-fragment packets (type=END, total=1, seq=0), start immediately
if (type == Fragment::END && total_fragments == 1 && sequence == 0) {
if (!startReassembly(peer_identity, total_fragments)) {
return false;
}
slot = findSlot(peer_identity);
} else {
TRACE("BLEReassembler: Received fragment without START, discarding");
return false;
}
} else {
slot = findSlot(peer_identity);
}
}
if (!slot) {
ERROR("BLEReassembler: Failed to find/create reassembly session");
return false;
}
PendingReassembly& reassembly = slot->reassembly;
// Validate total_fragments matches
if (total_fragments != reassembly.total_fragments) {
char buf[80];
snprintf(buf, sizeof(buf), "BLEReassembler: Fragment total mismatch, expected %u got %u",
reassembly.total_fragments, total_fragments);
TRACE(buf);
return false;
}
// Validate sequence is in range
if (sequence >= reassembly.total_fragments) {
char buf[64];
snprintf(buf, sizeof(buf), "BLEReassembler: Sequence out of range: %u", sequence);
TRACE(buf);
return false;
}
// Check for duplicate
if (reassembly.fragments[sequence].received) {
char buf[64];
snprintf(buf, sizeof(buf), "BLEReassembler: Duplicate fragment %u", sequence);
TRACE(buf);
// Still update last_activity to keep session alive
reassembly.last_activity = now;
return true; // Not an error, just duplicate
}
// Store fragment payload into fixed-size buffer
Bytes payload = BLEFragmenter::extractPayload(fragment);
if (payload.size() > MAX_FRAGMENT_PAYLOAD_SIZE) {
char buf[80];
snprintf(buf, sizeof(buf), "BLEReassembler: Fragment payload too large: %zu > %zu",
payload.size(), MAX_FRAGMENT_PAYLOAD_SIZE);
WARNING(buf);
return false;
}
memcpy(reassembly.fragments[sequence].data, payload.data(), payload.size());
reassembly.fragments[sequence].data_size = payload.size();
reassembly.fragments[sequence].received = true;
reassembly.received_count++;
reassembly.last_activity = now;
{
char buf[64];
snprintf(buf, sizeof(buf), "BLEReassembler: Received fragment %u/%u", sequence + 1, reassembly.total_fragments);
TRACE(buf);
}
// Check if complete
if (reassembly.received_count == reassembly.total_fragments) {
// Assemble complete packet
Bytes complete_packet = assembleFragments(reassembly);
{
char buf[64];
snprintf(buf, sizeof(buf), "BLEReassembler: Completed reassembly, %zu bytes", complete_packet.size());
TRACE(buf);
}
// Remove from pending before callback (callback might trigger new data)
Bytes identity_copy = reassembly.peer_identity;
slot->clear();
// Invoke callback
if (_reassembly_callback) {
_reassembly_callback(identity_copy, complete_packet);
}
}
return true;
}
void BLEReassembler::checkTimeouts() {
double now = Utilities::OS::time();
// Find and clean up expired reassemblies
for (size_t i = 0; i < MAX_PENDING_REASSEMBLIES; i++) {
if (!_pending_pool[i].in_use) {
continue;
}
PendingReassembly& reassembly = _pending_pool[i].reassembly;
double age = now - reassembly.started_at;
if (age > _timeout_seconds) {
{
char buf[80];
snprintf(buf, sizeof(buf), "BLEReassembler: Timeout waiting for fragments, received %u/%u",
reassembly.received_count, reassembly.total_fragments);
WARNING(buf);
}
// Copy identity before clearing
Bytes peer_identity = _pending_pool[i].transfer_id;
// Clear the slot
_pending_pool[i].clear();
// Invoke timeout callback after clearing (callback might start new reassembly)
if (_timeout_callback) {
_timeout_callback(peer_identity, "Reassembly timeout");
}
}
}
}
void BLEReassembler::clearForPeer(const Bytes& peer_identity) {
PendingReassemblySlot* slot = findSlot(peer_identity);
if (slot) {
TRACE("BLEReassembler: Clearing pending reassembly for peer");
slot->clear();
}
}
void BLEReassembler::clearAll() {
char buf[64];
snprintf(buf, sizeof(buf), "BLEReassembler: Clearing all pending reassemblies (%zu sessions)", pendingCount());
TRACE(buf);
for (size_t i = 0; i < MAX_PENDING_REASSEMBLIES; i++) {
_pending_pool[i].clear();
}
}
bool BLEReassembler::hasPending(const Bytes& peer_identity) const {
return findSlot(peer_identity) != nullptr;
}
bool BLEReassembler::startReassembly(const Bytes& peer_identity, uint16_t total_fragments) {
// Validate fragment count fits in fixed-size array
if (total_fragments > MAX_FRAGMENTS_PER_REASSEMBLY) {
char buf[80];
snprintf(buf, sizeof(buf), "BLEReassembler: Too many fragments: %u > %zu",
total_fragments, MAX_FRAGMENTS_PER_REASSEMBLY);
WARNING(buf);
return false;
}
// Allocate a slot (reuses existing or finds free)
PendingReassemblySlot* slot = allocateSlot(peer_identity);
if (!slot) {
WARNING("BLEReassembler: Pool full, cannot start new reassembly");
return false;
}
double now = Utilities::OS::time();
// Initialize the reassembly state
PendingReassembly& reassembly = slot->reassembly;
reassembly.clear(); // Clear any old data
reassembly.peer_identity = peer_identity;
reassembly.total_fragments = total_fragments;
reassembly.received_count = 0;
reassembly.started_at = now;
reassembly.last_activity = now;
char buf[64];
snprintf(buf, sizeof(buf), "BLEReassembler: Starting reassembly for %u fragments", total_fragments);
TRACE(buf);
return true;
}
Bytes BLEReassembler::assembleFragments(const PendingReassembly& reassembly) {
// Calculate total size
size_t total_size = 0;
for (size_t i = 0; i < reassembly.total_fragments; i++) {
total_size += reassembly.fragments[i].data_size;
}
// Allocate result buffer
Bytes result(total_size);
uint8_t* ptr = result.writable(total_size);
result.resize(total_size);
// Concatenate fragments in order
size_t offset = 0;
for (size_t i = 0; i < reassembly.total_fragments; i++) {
const FragmentInfo& frag = reassembly.fragments[i];
if (frag.data_size > 0) {
memcpy(ptr + offset, frag.data, frag.data_size);
offset += frag.data_size;
}
}
return result;
}
}} // namespace RNS::BLE