feat(rncp_handler): update RNCPHandler with event emission for transfer status and improve teardown logic for receive destination

This commit is contained in:
Ivan
2026-04-13 18:28:17 -05:00
parent 72a5aac40d
commit 2fe14aceea
+89
View File
@@ -1,4 +1,5 @@
import asyncio
import contextlib
import os
import shutil
import time
@@ -21,6 +22,51 @@ class RNCPHandler:
self.fetch_auto_compress = True
self.allow_overwrite_on_receive = False
self.allowed_identity_hashes = []
self._listener_fetch_registered = False
self._listener_fetch_allowed = False
self.on_receive_completed = None
def _emit_receive_event(self, payload):
if self.on_receive_completed:
try:
self.on_receive_completed(payload)
except Exception:
pass
def teardown_receive_destination(self):
if self.receive_destination is None:
return
dest = self.receive_destination
self.receive_destination = None
if self._listener_fetch_registered:
with contextlib.suppress(Exception):
dest.deregister_request_handler("fetch_file")
self._listener_fetch_registered = False
self._listener_fetch_allowed = False
with contextlib.suppress(Exception):
RNS.Transport.deregister_destination(dest)
def get_listener_status(self):
receive_dir = os.path.join(self.storage_dir, "rncp_received")
if self.receive_destination is None:
return {
"listening": False,
"destination_hash": None,
"allowed_hashes": [],
"fetch_allowed": False,
"fetch_jail": None,
"allow_overwrite": False,
"receive_directory": receive_dir,
}
return {
"listening": True,
"destination_hash": self.receive_destination.hash.hex(),
"allowed_hashes": [h.hex() for h in self.allowed_identity_hashes],
"fetch_allowed": self._listener_fetch_allowed,
"fetch_jail": self.fetch_jail,
"allow_overwrite": self.allow_overwrite_on_receive,
"receive_directory": receive_dir,
}
def setup_receive_destination(
self,
@@ -29,6 +75,8 @@ class RNCPHandler:
fetch_jail=None,
allow_overwrite=False,
):
self.teardown_receive_destination()
if allowed_hashes:
self.allowed_identity_hashes = [
bytes.fromhex(h) if isinstance(h, str) else h for h in allowed_hashes
@@ -36,6 +84,7 @@ class RNCPHandler:
self.fetch_jail = fetch_jail
self.allow_overwrite_on_receive = allow_overwrite
self._listener_fetch_allowed = bool(fetch_allowed)
identity_path = os.path.join(RNS.Reticulum.identitypath, self.APP_NAME)
if os.path.isfile(identity_path):
@@ -63,6 +112,7 @@ class RNCPHandler:
allow=RNS.Destination.ALLOW_LIST,
allowed_list=self.allowed_identity_hashes,
)
self._listener_fetch_registered = True
return self.receive_destination.hash.hex()
@@ -130,12 +180,39 @@ class RNCPHandler:
saved_filename
)
self.active_transfers[transfer_id]["filename"] = filename
self._emit_receive_event(
{
"transfer_id": transfer_id,
"status": "completed",
"saved_path": saved_filename,
"filename": filename,
"error": None,
},
)
except Exception as e:
if transfer_id in self.active_transfers:
self.active_transfers[transfer_id]["status"] = "error"
self.active_transfers[transfer_id]["error"] = str(e)
self._emit_receive_event(
{
"transfer_id": transfer_id,
"status": "error",
"saved_path": None,
"filename": None,
"error": str(e),
},
)
elif transfer_id in self.active_transfers:
self.active_transfers[transfer_id]["status"] = "failed"
self._emit_receive_event(
{
"transfer_id": transfer_id,
"status": "failed",
"saved_path": None,
"filename": None,
"error": None,
},
)
def _fetch_request(
self,
@@ -189,6 +266,7 @@ class RNCPHandler:
timeout: float = RNS.Transport.PATH_REQUEST_TIMEOUT,
on_progress: Callable[[float], None] | None = None,
no_compress: bool = False,
on_transfer_started: Callable[[str], None] | None = None,
):
file_path = os.path.expanduser(file_path)
if not os.path.isfile(file_path):
@@ -252,6 +330,11 @@ class RNCPHandler:
"started_at": time.time(),
"file_path": file_path,
}
if on_transfer_started:
try:
on_transfer_started(transfer_id)
except Exception:
pass
while resource.status < RNS.Resource.COMPLETE:
await asyncio.sleep(0.1)
@@ -282,6 +365,7 @@ class RNCPHandler:
on_progress: Callable[[float], None] | None = None,
save_path: str | None = None,
allow_overwrite: bool = False,
on_transfer_started: Callable[[str], None] | None = None,
):
if not RNS.Transport.has_path(destination_hash):
RNS.Transport.request_path(destination_hash)
@@ -342,6 +426,11 @@ class RNCPHandler:
def fetch_resource_started(resource):
nonlocal resource_status, current_resource
current_resource = resource
if on_transfer_started and hasattr(resource, "hash") and resource.hash:
try:
on_transfer_started(resource.hash.hex())
except Exception:
pass
def progress_callback(resource):
if on_progress: