Add link-based LXMF delivery, fix display SPI contention

Display: switch pushPixelsDMA to blocking pushPixels to prevent SPI bus
contention between the display and SX1262 radio on the shared FSPI bus.

LXMF: add opportunistic-first delivery with background link upgrade.
First message to a peer sends immediately via opportunistic packet. A
link is established in the background; subsequent messages use the link
when active (prepending dest_hash for Python DIRECT format), falling
back to opportunistic otherwise. No user-facing delay.
This commit is contained in:
DeFiDude
2026-03-19 13:07:19 -06:00
parent 71849cbb62
commit 8b8a9b572f
3 changed files with 74 additions and 11 deletions
+2 -1
View File
@@ -11,7 +11,8 @@ static void lvgl_flush_cb(lv_disp_drv_t* drv, const lv_area_t* area, lv_color_t*
uint32_t h = area->y2 - area->y1 + 1;
s_gfx->startWrite();
s_gfx->setAddrWindow(area->x1, area->y1, w, h);
s_gfx->pushPixelsDMA((lgfx::swap565_t*)&color_p->full, w * h);
// Blocking pushPixels prevents SPI bus contention with SX1262 radio on shared FSPI bus
s_gfx->pushPixels((lgfx::swap565_t*)&color_p->full, w * h);
s_gfx->endWrite();
lv_disp_flush_ready(drv);
}
+65 -10
View File
@@ -135,25 +135,65 @@ bool LXMFManager::sendDirect(LXMFMessage& msg) {
Serial.printf("[DIAG] LXMF: outDest=%s msgDest=%s match=%s\n",
outDest.hash().toHex().c_str(), msg.destHash.toHex().c_str(),
(outDest.hash() == msg.destHash) ? "YES" : "NO");
// packFull returns opportunistic format: [src:16][sig:64][msgpack]
std::vector<uint8_t> payload = msg.packFull(_rns->identity());
if (payload.empty()) { Serial.println("[LXMF] packFull returned empty!"); msg.status = LXMFStatus::FAILED; return true; }
RNS::Bytes payloadBytes(payload.data(), payload.size());
if (payloadBytes.size() > RNS::Type::Reticulum::MDU) {
Serial.printf("[LXMF] payload too large: %d > MDU\n", (int)payloadBytes.size());
msg.status = LXMFStatus::FAILED; return true;
}
msg.status = LXMFStatus::SENDING;
Serial.printf("[LXMF] sending packet: %d bytes to %s\n", (int)payloadBytes.size(), outDest.hash().toHex().substr(0, 12).c_str());
RNS::Packet packet(outDest, payloadBytes);
RNS::PacketReceipt receipt = packet.send();
if (receipt) {
bool sent = false;
// Try link-based delivery if we have an active link to this peer
if (_outLink && _outLinkDestHash == msg.destHash
&& _outLink.status() == RNS::Type::Link::ACTIVE) {
// Link delivery: prepend dest_hash (Python DIRECT format)
std::vector<uint8_t> linkPayload;
linkPayload.reserve(16 + payload.size());
linkPayload.insert(linkPayload.end(), msg.destHash.data(), msg.destHash.data() + 16);
linkPayload.insert(linkPayload.end(), payload.begin(), payload.end());
RNS::Bytes linkBytes(linkPayload.data(), linkPayload.size());
if (linkBytes.size() <= RNS::Type::Reticulum::MDU) {
Serial.printf("[LXMF] sending via link: %d bytes to %s\n",
(int)linkBytes.size(), msg.destHash.toHex().substr(0, 8).c_str());
RNS::Packet packet(_outLink, linkBytes);
RNS::PacketReceipt receipt = packet.send();
if (receipt) { sent = true; }
}
}
// Fallback: opportunistic delivery (always available, no delay)
if (!sent) {
RNS::Bytes payloadBytes(payload.data(), payload.size());
if (payloadBytes.size() > RNS::Type::Reticulum::MDU) {
Serial.printf("[LXMF] payload too large: %d > MDU\n", (int)payloadBytes.size());
msg.status = LXMFStatus::FAILED; return true;
}
Serial.printf("[LXMF] sending opportunistic: %d bytes to %s\n",
(int)payloadBytes.size(), outDest.hash().toHex().substr(0, 12).c_str());
RNS::Packet packet(outDest, payloadBytes);
RNS::PacketReceipt receipt = packet.send();
if (receipt) { sent = true; }
}
if (sent) {
msg.status = LXMFStatus::SENT;
// messageId already computed by packFull() matching Python's LXMessage.pack()
Serial.printf("[LXMF] SENT OK: %d bytes, msgId=%s\n", (int)payloadBytes.size(), msg.messageId.toHex().substr(0, 8).c_str());
Serial.printf("[LXMF] SENT OK: msgId=%s\n", msg.messageId.toHex().substr(0, 8).c_str());
} else {
Serial.println("[LXMF] send FAILED: no receipt");
msg.status = LXMFStatus::FAILED;
}
// Background: establish link for future messages to this peer
if (!_outLinkPending && (!_outLink || _outLinkDestHash != msg.destHash
|| _outLink.status() == RNS::Type::Link::CLOSED)) {
_outLinkDestHash = msg.destHash;
_outLinkPending = true;
Serial.printf("[LXMF] Establishing link to %s for future messages\n",
msg.destHash.toHex().substr(0, 8).c_str());
RNS::Link newLink(outDest, onOutLinkEstablished, onOutLinkClosed);
}
return true;
}
@@ -169,6 +209,21 @@ void LXMFManager::onPacketReceived(const RNS::Bytes& data, const RNS::Packet& pa
_instance->processIncoming(fullData.data(), fullData.size(), destHash);
}
void LXMFManager::onOutLinkEstablished(RNS::Link& link) {
if (!_instance) return;
_instance->_outLink = link;
_instance->_outLinkPending = false;
Serial.printf("[LXMF] Outbound link established to %s\n",
_instance->_outLinkDestHash.toHex().substr(0, 8).c_str());
}
void LXMFManager::onOutLinkClosed(RNS::Link& link) {
if (!_instance) return;
_instance->_outLink = {RNS::Type::NONE};
_instance->_outLinkPending = false;
Serial.println("[LXMF] Outbound link closed");
}
void LXMFManager::onLinkEstablished(RNS::Link& link) {
if (!_instance) return;
link.set_packet_callback([](const RNS::Bytes& data, const RNS::Packet& packet) {
+7
View File
@@ -34,6 +34,8 @@ private:
void processIncoming(const uint8_t* data, size_t len, const RNS::Bytes& destHash);
static void onPacketReceived(const RNS::Bytes& data, const RNS::Packet& packet);
static void onLinkEstablished(RNS::Link& link);
static void onOutLinkEstablished(RNS::Link& link);
static void onOutLinkClosed(RNS::Link& link);
ReticulumManager* _rns = nullptr;
MessageStore* _store = nullptr;
@@ -41,6 +43,11 @@ private:
StatusCallback _statusCb;
std::deque<LXMFMessage> _outQueue;
// Outbound link state (opportunistic-first, link upgrades in background)
RNS::Link _outLink;
RNS::Bytes _outLinkDestHash;
bool _outLinkPending = false;
// Deduplication: recently seen message IDs
std::set<std::string> _seenMessageIds;
static constexpr int MAX_SEEN_IDS = 100;