From 629bbbc7c6096ef24fb54fd7337871c79fa310d3 Mon Sep 17 00:00:00 2001 From: Sudo-Ivan Date: Sun, 4 Jan 2026 19:10:22 -0600 Subject: [PATCH] refactor(meshchat): update map to render tiles faster (online), message handling by adding context support to forwarding and delivery methods; improve LXMF message processing and router initialization --- meshchatx/meshchat.py | 109 ++++++++++++------ meshchatx/src/backend/forwarding_manager.py | 6 +- meshchatx/src/backend/identity_context.py | 4 +- meshchatx/src/backend/meshchat_utils.py | 28 +++++ .../src/frontend/components/map/MapPage.vue | 17 ++- 5 files changed, 119 insertions(+), 45 deletions(-) diff --git a/meshchatx/meshchat.py b/meshchatx/meshchat.py index 6cc42b8..5db9f8c 100644 --- a/meshchatx/meshchat.py +++ b/meshchatx/meshchat.py @@ -9091,7 +9091,7 @@ class ReticulumMeshChat: self.db_upsert_lxmf_message(lxmf_message, is_spam=is_spam, context=ctx) # handle forwarding - self.handle_forwarding(lxmf_message) + self.handle_forwarding(lxmf_message, context=ctx) # handle telemetry try: @@ -9200,8 +9200,12 @@ class ReticulumMeshChat: print(f"lxmf_delivery error: {e}") # handles lxmf message forwarding logic - def handle_forwarding(self, lxmf_message: LXMF.LXMessage): + def handle_forwarding(self, lxmf_message: LXMF.LXMessage, context=None): try: + ctx = context or self.current_context + if not ctx: + return + source_hash = lxmf_message.source_hash.hex() destination_hash = lxmf_message.destination_hash.hex() @@ -9227,7 +9231,7 @@ class ReticulumMeshChat: file_attachments_field = LxmfFileAttachmentsField(attachments) # check if this message is for an alias identity (REPLY PATH) - mapping = self.database.messages.get_forwarding_mapping( + mapping = ctx.database.messages.get_forwarding_mapping( alias_hash=destination_hash, ) @@ -9246,13 +9250,14 @@ class ReticulumMeshChat: image_field=image_field, audio_field=audio_field, file_attachments_field=file_attachments_field, + context=ctx, ), ) return # check if this message matches a forwarding rule (FORWARD PATH) # we check for rules that apply to the destination of this message - rules = self.database.misc.get_forwarding_rules( + rules = ctx.database.misc.get_forwarding_rules( identity_hash=destination_hash, active_only=True, ) @@ -9266,7 +9271,7 @@ class ReticulumMeshChat: continue # find or create mapping for this (Source, Final Recipient) pair - mapping = self.forwarding_manager.get_or_create_mapping( + mapping = ctx.forwarding_manager.get_or_create_mapping( source_hash, rule["forward_to_hash"], destination_hash, @@ -9287,6 +9292,7 @@ class ReticulumMeshChat: image_field=image_field, audio_field=audio_field, file_attachments_field=file_attachments_field, + context=ctx, ), ) except Exception as e: @@ -9296,9 +9302,9 @@ class ReticulumMeshChat: traceback.print_exc() # handle delivery status update for an outbound lxmf message - def on_lxmf_sending_state_updated(self, lxmf_message): + def on_lxmf_sending_state_updated(self, lxmf_message, context=None): # upsert lxmf message to database - self.db_upsert_lxmf_message(lxmf_message) + self.db_upsert_lxmf_message(lxmf_message, context=context) # send lxmf message state to all websocket clients AsyncUtils.run_async( @@ -9317,20 +9323,26 @@ class ReticulumMeshChat: ) # handle delivery failed for an outbound lxmf message - def on_lxmf_sending_failed(self, lxmf_message): + def on_lxmf_sending_failed(self, lxmf_message, context=None): # check if this failed message should fall back to sending via a propagation node if ( lxmf_message.state == LXMF.LXMessage.FAILED and hasattr(lxmf_message, "try_propagation_on_fail") and lxmf_message.try_propagation_on_fail ): - self.send_failed_message_via_propagation_node(lxmf_message) + self.send_failed_message_via_propagation_node(lxmf_message, context=context) # update state self.on_lxmf_sending_state_updated(lxmf_message) # sends a previously failed message via a propagation node - def send_failed_message_via_propagation_node(self, lxmf_message: LXMF.LXMessage): + def send_failed_message_via_propagation_node( + self, lxmf_message: LXMF.LXMessage, context=None + ): + ctx = context or self.current_context + if not ctx: + return + # reset internal message state lxmf_message.packed = None lxmf_message.delivery_attempts = 0 @@ -9343,12 +9355,12 @@ class ReticulumMeshChat: # resend message source_hash = lxmf_message.source_hash.hex() - router = self.message_router + router = ctx.message_router if ( - self.forwarding_manager - and source_hash in self.forwarding_manager.forwarding_routers + ctx.forwarding_manager + and source_hash in ctx.forwarding_manager.forwarding_routers ): - router = self.forwarding_manager.forwarding_routers[source_hash] + router = ctx.forwarding_manager.forwarding_routers[source_hash] router.handle_outbound(lxmf_message) # upserts the provided lxmf message to the database @@ -9393,7 +9405,12 @@ class ReticulumMeshChat: title: str = "", sender_identity_hash: str = None, no_display: bool = False, + context=None, ) -> LXMF.LXMessage: + ctx = context or self.current_context + if not ctx: + raise RuntimeError("No identity context available for sending message") + # convert destination hash to bytes destination_hash_bytes = bytes.fromhex(destination_hash) @@ -9442,7 +9459,7 @@ class ReticulumMeshChat: # send messages over a direct link by default desired_delivery_method = LXMF.LXMessage.DIRECT if ( - not self.message_router.delivery_link_available(destination_hash_bytes) + not ctx.message_router.delivery_link_available(destination_hash_bytes) and RNS.Identity.current_ratchet_id(destination_hash_bytes) is not None ): # since there's no link established to the destination, it's faster to send opportunistically @@ -9452,14 +9469,14 @@ class ReticulumMeshChat: desired_delivery_method = LXMF.LXMessage.OPPORTUNISTIC # determine which identity to send from - source_destination = self.local_lxmf_destination + source_destination = ctx.local_lxmf_destination if sender_identity_hash is not None: if ( - self.forwarding_manager + ctx.forwarding_manager and sender_identity_hash - in self.forwarding_manager.forwarding_destinations + in ctx.forwarding_manager.forwarding_destinations ): - source_destination = self.forwarding_manager.forwarding_destinations[ + source_destination = ctx.forwarding_manager.forwarding_destinations[ sender_identity_hash ] else: @@ -9476,7 +9493,7 @@ class ReticulumMeshChat: desired_method=desired_delivery_method, ) lxmf_message.try_propagation_on_fail = ( - self.config.auto_send_failed_messages_to_propagation_node.get() + ctx.config.auto_send_failed_messages_to_propagation_node.get() ) lxmf_message.fields = {} @@ -9541,28 +9558,33 @@ class ReticulumMeshChat: ] # update last sent icon hash for this destination - self.database.misc.update_last_sent_icon_hash( + ctx.database.misc.update_last_sent_icon_hash( destination_hash, current_icon_hash ) # register delivery callbacks - lxmf_message.register_delivery_callback(self.on_lxmf_sending_state_updated) - lxmf_message.register_failed_callback(self.on_lxmf_sending_failed) + lxmf_message.register_delivery_callback( + lambda msg: self.on_lxmf_sending_state_updated(msg, context=ctx) + ) + lxmf_message.register_failed_callback( + lambda msg: self.on_lxmf_sending_failed(msg, context=ctx) + ) # determine which router to use - router = self.message_router + router = ctx.message_router if ( sender_identity_hash is not None - and sender_identity_hash in self.forwarding_manager.forwarding_routers + and ctx.forwarding_manager + and sender_identity_hash in ctx.forwarding_manager.forwarding_routers ): - router = self.forwarding_manager.forwarding_routers[sender_identity_hash] + router = ctx.forwarding_manager.forwarding_routers[sender_identity_hash] # send lxmf message to be routed to destination router.handle_outbound(lxmf_message) # upsert lxmf message to database if not no_display: - self.db_upsert_lxmf_message(lxmf_message) + self.db_upsert_lxmf_message(lxmf_message, context=ctx) # tell all websocket clients that old failed message was deleted so it can remove from ui if not no_display: @@ -9583,7 +9605,9 @@ class ReticulumMeshChat: # otherwise other incoming websocket packets will not be processed until sending is complete # which results in the next message not showing up until the first message is finished if not no_display: - AsyncUtils.run_async(self.handle_lxmf_message_progress(lxmf_message)) + AsyncUtils.run_async( + self.handle_lxmf_message_progress(lxmf_message, context=ctx) + ) return lxmf_message @@ -9642,16 +9666,20 @@ class ReticulumMeshChat: print(f"Failed to respond to telemetry request: {e}") # updates lxmf message in database and broadcasts to websocket until it's delivered, or it fails - async def handle_lxmf_message_progress(self, lxmf_message): + async def handle_lxmf_message_progress(self, lxmf_message, context=None): # FIXME: there's no register_progress_callback on the lxmf message, so manually send progress until delivered, propagated or failed # we also can't use on_lxmf_sending_state_updated method to do this, because of async/await issues... + ctx = context or self.current_context + if not ctx: + return + should_update_message = True while should_update_message: # wait 1 second between sending updates await asyncio.sleep(1) # upsert lxmf message to database (as we want to update the progress in database too) - self.db_upsert_lxmf_message(lxmf_message) + self.db_upsert_lxmf_message(lxmf_message, context=ctx) # send update to websocket clients await self.websocket_broadcast( @@ -9816,9 +9844,11 @@ class ReticulumMeshChat: ) # resend all failed messages that were intended for this destination - if self.config.auto_resend_failed_messages_when_announce_received.get(): + if ctx.config.auto_resend_failed_messages_when_announce_received.get(): AsyncUtils.run_async( - self.resend_failed_messages_for_destination(destination_hash.hex()), + self.resend_failed_messages_for_destination( + destination_hash.hex(), context=ctx + ), ) # handle an announce received from reticulum, for an lxmf propagation node address @@ -9874,9 +9904,15 @@ class ReticulumMeshChat: ) # resends all messages that previously failed to send to the provided destination hash - async def resend_failed_messages_for_destination(self, destination_hash: str): + async def resend_failed_messages_for_destination( + self, destination_hash: str, context=None + ): + ctx = context or self.current_context + if not ctx: + return + # get messages that failed to send to this destination - failed_messages = self.database.messages.get_failed_messages_for_destination( + failed_messages = ctx.database.messages.get_failed_messages_for_destination( destination_hash, ) @@ -9915,7 +9951,7 @@ class ReticulumMeshChat: file_attachments_field = LxmfFileAttachmentsField(file_attachments) # don't resend message with attachments if not allowed - if not self.config.allow_auto_resending_failed_messages_with_attachments.get(): + if not ctx.config.allow_auto_resending_failed_messages_with_attachments.get(): if ( image_field is not None or audio_field is not None @@ -9933,10 +9969,11 @@ class ReticulumMeshChat: image_field=image_field, audio_field=audio_field, file_attachments_field=file_attachments_field, + context=ctx, ) # remove original failed message from database - self.database.messages.delete_lxmf_message_by_hash( + ctx.database.messages.delete_lxmf_message_by_hash( failed_message["hash"], ) diff --git a/meshchatx/src/backend/forwarding_manager.py b/meshchatx/src/backend/forwarding_manager.py index b86e0db..ce274f3 100644 --- a/meshchatx/src/backend/forwarding_manager.py +++ b/meshchatx/src/backend/forwarding_manager.py @@ -1,10 +1,10 @@ import base64 import os -import LXMF import RNS from .database import Database +from .meshchat_utils import create_lxmf_router class ForwardingManager: @@ -34,7 +34,7 @@ class ForwardingManager: ) os.makedirs(router_storage_path, exist_ok=True) - router = LXMF.LXMRouter( + router = create_lxmf_router( identity=alias_identity, storagepath=router_storage_path, ) @@ -79,7 +79,7 @@ class ForwardingManager: ) os.makedirs(router_storage_path, exist_ok=True) - router = LXMF.LXMRouter( + router = create_lxmf_router( identity=alias_identity, storagepath=router_storage_path, ) diff --git a/meshchatx/src/backend/identity_context.py b/meshchatx/src/backend/identity_context.py index 78f4d6a..86c5959 100644 --- a/meshchatx/src/backend/identity_context.py +++ b/meshchatx/src/backend/identity_context.py @@ -2,7 +2,6 @@ import os import asyncio import threading import RNS -import LXMF from meshchatx.src.backend.database import Database from meshchatx.src.backend.integrity_manager import IntegrityManager from meshchatx.src.backend.config_manager import ConfigManager @@ -21,6 +20,7 @@ from meshchatx.src.backend.rnpath_handler import RNPathHandler from meshchatx.src.backend.rnprobe_handler import RNProbeHandler from meshchatx.src.backend.translator_handler import TranslatorHandler from meshchatx.src.backend.forwarding_manager import ForwardingManager +from meshchatx.src.backend.meshchat_utils import create_lxmf_router from meshchatx.src.backend.announce_handler import AnnounceHandler from meshchatx.src.backend.community_interfaces import CommunityInterfacesManager @@ -168,7 +168,7 @@ class IdentityContext: # 4. Initialize LXMF Router propagation_stamp_cost = self.config.lxmf_propagation_node_stamp_cost.get() - self.message_router = LXMF.LXMRouter( + self.message_router = create_lxmf_router( identity=self.identity, storagepath=self.lxmf_router_path, propagation_cost=propagation_stamp_cost, diff --git a/meshchatx/src/backend/meshchat_utils.py b/meshchatx/src/backend/meshchat_utils.py index 26159da..2d655e2 100644 --- a/meshchatx/src/backend/meshchat_utils.py +++ b/meshchatx/src/backend/meshchat_utils.py @@ -1,11 +1,39 @@ import base64 import json +import signal +import threading import LXMF import RNS.vendor.umsgpack as msgpack from LXMF import LXMRouter +def create_lxmf_router(identity, storagepath, propagation_cost=None): + """ + Creates an LXMF.LXMRouter instance safely, avoiding signal handler crashes + when called from non-main threads. + """ + if threading.current_thread() != threading.main_thread(): + # signal.signal can only be called from the main thread in Python + # We monkeypatch it temporarily to avoid the ValueError + original_signal = signal.signal + try: + signal.signal = lambda s, h: None + return LXMF.LXMRouter( + identity=identity, + storagepath=storagepath, + propagation_cost=propagation_cost, + ) + finally: + signal.signal = original_signal + else: + return LXMF.LXMRouter( + identity=identity, + storagepath=storagepath, + propagation_cost=propagation_cost, + ) + + def parse_bool_query_param(value: str | None) -> bool: if value is None: return False diff --git a/meshchatx/src/frontend/components/map/MapPage.vue b/meshchatx/src/frontend/components/map/MapPage.vue index 0062bc9..ce63109 100644 --- a/meshchatx/src/frontend/components/map/MapPage.vue +++ b/meshchatx/src/frontend/components/map/MapPage.vue @@ -1520,7 +1520,10 @@ export default { throw new Error(`HTTP ${response.status}`); } const blob = await response.blob(); - tile.getImage().src = URL.createObjectURL(blob); + const url = URL.createObjectURL(blob); + tile.getImage().src = url; + // Cleanup to prevent memory leaks + setTimeout(() => URL.revokeObjectURL(url), 10000); } catch { tile.setState(3); } @@ -1535,7 +1538,9 @@ export default { try { const cached = await TileCache.getTile(src); if (cached) { - tile.getImage().src = URL.createObjectURL(cached); + const url = URL.createObjectURL(cached); + tile.getImage().src = url; + setTimeout(() => URL.revokeObjectURL(url), 10000); return; } @@ -1544,8 +1549,12 @@ export default { throw new Error(`HTTP ${response.status}`); } const blob = await response.blob(); - await TileCache.setTile(src, blob); - tile.getImage().src = URL.createObjectURL(blob); + const url = URL.createObjectURL(blob); + tile.getImage().src = url; + setTimeout(() => URL.revokeObjectURL(url), 10000); + + // Background cache write to avoid blocking UI + TileCache.setTile(src, blob).catch(() => {}); } catch { originalTileLoadFunction(tile, src); }