From 2fe14aceea978d1ab77b8c94a4516d7b73fb458f Mon Sep 17 00:00:00 2001 From: Ivan Date: Mon, 13 Apr 2026 18:28:17 -0500 Subject: [PATCH] feat(rncp_handler): update RNCPHandler with event emission for transfer status and improve teardown logic for receive destination --- meshchatx/src/backend/rncp_handler.py | 89 +++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) diff --git a/meshchatx/src/backend/rncp_handler.py b/meshchatx/src/backend/rncp_handler.py index a8f057b..7c5a56c 100644 --- a/meshchatx/src/backend/rncp_handler.py +++ b/meshchatx/src/backend/rncp_handler.py @@ -1,4 +1,5 @@ import asyncio +import contextlib import os import shutil import time @@ -21,6 +22,51 @@ class RNCPHandler: self.fetch_auto_compress = True self.allow_overwrite_on_receive = False self.allowed_identity_hashes = [] + self._listener_fetch_registered = False + self._listener_fetch_allowed = False + self.on_receive_completed = None + + def _emit_receive_event(self, payload): + if self.on_receive_completed: + try: + self.on_receive_completed(payload) + except Exception: + pass + + def teardown_receive_destination(self): + if self.receive_destination is None: + return + dest = self.receive_destination + self.receive_destination = None + if self._listener_fetch_registered: + with contextlib.suppress(Exception): + dest.deregister_request_handler("fetch_file") + self._listener_fetch_registered = False + self._listener_fetch_allowed = False + with contextlib.suppress(Exception): + RNS.Transport.deregister_destination(dest) + + def get_listener_status(self): + receive_dir = os.path.join(self.storage_dir, "rncp_received") + if self.receive_destination is None: + return { + "listening": False, + "destination_hash": None, + "allowed_hashes": [], + "fetch_allowed": False, + "fetch_jail": None, + "allow_overwrite": False, + "receive_directory": receive_dir, + } + return { + "listening": True, + "destination_hash": self.receive_destination.hash.hex(), + "allowed_hashes": [h.hex() for h in self.allowed_identity_hashes], + "fetch_allowed": self._listener_fetch_allowed, + "fetch_jail": self.fetch_jail, + "allow_overwrite": self.allow_overwrite_on_receive, + "receive_directory": receive_dir, + } def setup_receive_destination( self, @@ -29,6 +75,8 @@ class RNCPHandler: fetch_jail=None, allow_overwrite=False, ): + self.teardown_receive_destination() + if allowed_hashes: self.allowed_identity_hashes = [ bytes.fromhex(h) if isinstance(h, str) else h for h in allowed_hashes @@ -36,6 +84,7 @@ class RNCPHandler: self.fetch_jail = fetch_jail self.allow_overwrite_on_receive = allow_overwrite + self._listener_fetch_allowed = bool(fetch_allowed) identity_path = os.path.join(RNS.Reticulum.identitypath, self.APP_NAME) if os.path.isfile(identity_path): @@ -63,6 +112,7 @@ class RNCPHandler: allow=RNS.Destination.ALLOW_LIST, allowed_list=self.allowed_identity_hashes, ) + self._listener_fetch_registered = True return self.receive_destination.hash.hex() @@ -130,12 +180,39 @@ class RNCPHandler: saved_filename ) self.active_transfers[transfer_id]["filename"] = filename + self._emit_receive_event( + { + "transfer_id": transfer_id, + "status": "completed", + "saved_path": saved_filename, + "filename": filename, + "error": None, + }, + ) except Exception as e: if transfer_id in self.active_transfers: self.active_transfers[transfer_id]["status"] = "error" self.active_transfers[transfer_id]["error"] = str(e) + self._emit_receive_event( + { + "transfer_id": transfer_id, + "status": "error", + "saved_path": None, + "filename": None, + "error": str(e), + }, + ) elif transfer_id in self.active_transfers: self.active_transfers[transfer_id]["status"] = "failed" + self._emit_receive_event( + { + "transfer_id": transfer_id, + "status": "failed", + "saved_path": None, + "filename": None, + "error": None, + }, + ) def _fetch_request( self, @@ -189,6 +266,7 @@ class RNCPHandler: timeout: float = RNS.Transport.PATH_REQUEST_TIMEOUT, on_progress: Callable[[float], None] | None = None, no_compress: bool = False, + on_transfer_started: Callable[[str], None] | None = None, ): file_path = os.path.expanduser(file_path) if not os.path.isfile(file_path): @@ -252,6 +330,11 @@ class RNCPHandler: "started_at": time.time(), "file_path": file_path, } + if on_transfer_started: + try: + on_transfer_started(transfer_id) + except Exception: + pass while resource.status < RNS.Resource.COMPLETE: await asyncio.sleep(0.1) @@ -282,6 +365,7 @@ class RNCPHandler: on_progress: Callable[[float], None] | None = None, save_path: str | None = None, allow_overwrite: bool = False, + on_transfer_started: Callable[[str], None] | None = None, ): if not RNS.Transport.has_path(destination_hash): RNS.Transport.request_path(destination_hash) @@ -342,6 +426,11 @@ class RNCPHandler: def fetch_resource_started(resource): nonlocal resource_status, current_resource current_resource = resource + if on_transfer_started and hasattr(resource, "hash") and resource.hash: + try: + on_transfer_started(resource.hash.hex()) + except Exception: + pass def progress_callback(resource): if on_progress: