From c3aa68adb1cb064df86ec6eb6eb83668f904948d Mon Sep 17 00:00:00 2001 From: Ivan Date: Tue, 7 Apr 2026 15:37:03 -0500 Subject: [PATCH] feat(nomadnet): implement caching for active links and better download phase tracking in NomadnetDownloader Also added stats for page load time and size --- meshchatx/meshchat.py | 57 +++++- meshchatx/src/backend/nomadnet_downloader.py | 171 +++++++++++------- .../nomadnetwork/NomadNetworkPage.vue | 93 +++++++++- meshchatx/src/frontend/locales/de.json | 8 +- meshchatx/src/frontend/locales/en.json | 8 +- meshchatx/src/frontend/locales/it.json | 8 +- meshchatx/src/frontend/locales/ru.json | 8 +- .../backend/test_nomadnet_downloader_boost.py | 158 +++++++++++++++- tests/frontend/NomadNetworkPage.test.js | 10 + 9 files changed, 437 insertions(+), 84 deletions(-) diff --git a/meshchatx/meshchat.py b/meshchatx/meshchat.py index 3f2a525..b6530ac 100644 --- a/meshchatx/meshchat.py +++ b/meshchatx/meshchat.py @@ -96,7 +96,7 @@ from meshchatx.src.backend.meshchat_utils import ( from meshchatx.src.backend.nomadnet_downloader import ( NomadnetFileDownloader, NomadnetPageDownloader, - nomadnet_cached_links, + get_cached_active_link, ) from meshchatx.src.backend.nomadnet_utils import ( convert_nomadnet_field_data_to_map, @@ -8140,15 +8140,14 @@ class ReticulumMeshChat: destination_hash = bytes.fromhex(destination_hash) # identify to existing active link - if destination_hash in nomadnet_cached_links: - link = nomadnet_cached_links[destination_hash] - if link.status is RNS.Link.ACTIVE: - link.identify(self.identity) - return web.json_response( - { - "message": "Identity has been sent!", - }, - ) + link = get_cached_active_link(destination_hash) + if link is not None: + link.identify(self.identity) + return web.json_response( + { + "message": "Identity has been sent!", + }, + ) # failed to identify return web.json_response( @@ -10623,6 +10622,24 @@ class ReticulumMeshChat: ), ) + def on_file_download_phase(phase: str): + AsyncUtils.run_async( + client.send_str( + json.dumps( + { + "type": "nomadnet.file.download", + "download_id": download_id, + "nomadnet_file_download": { + "status": "phase", + "load_phase": phase, + "destination_hash": destination_hash.hex(), + "file_path": file_path, + }, + }, + ), + ), + ) + # download the file downloader = NomadnetFileDownloader( destination_hash, @@ -10630,6 +10647,7 @@ class ReticulumMeshChat: on_file_download_success, on_file_download_failure, on_file_download_progress, + on_phase=on_file_download_phase, ) downloader.start_time = time.time() self.active_downloads[download_id] = downloader @@ -10797,6 +10815,24 @@ class ReticulumMeshChat: ), ) + def on_page_download_phase(phase: str): + AsyncUtils.run_async( + client.send_str( + json.dumps( + { + "type": "nomadnet.page.download", + "download_id": download_id, + "nomadnet_page_download": { + "status": "phase", + "load_phase": phase, + "destination_hash": destination_hash.hex(), + "page_path": page_path, + }, + }, + ), + ), + ) + # download the page downloader = NomadnetPageDownloader( destination_hash, @@ -10805,6 +10841,7 @@ class ReticulumMeshChat: on_page_download_success, on_page_download_failure, on_page_download_progress, + on_phase=on_page_download_phase, ) self.active_downloads[download_id] = downloader diff --git a/meshchatx/src/backend/nomadnet_downloader.py b/meshchatx/src/backend/nomadnet_downloader.py index 1c2901a..bf81ecc 100644 --- a/meshchatx/src/backend/nomadnet_downloader.py +++ b/meshchatx/src/backend/nomadnet_downloader.py @@ -1,13 +1,52 @@ import asyncio import io import os +import threading import time from collections.abc import Callable import RNS -# global cache for nomadnet links to avoid re-establishing them for every request -nomadnet_cached_links = {} +# Global cache for Nomad Network links (reuse instead of reconnecting per request). +# Protected by _nomadnet_links_lock for callers that may touch Reticulum from multiple threads. +nomadnet_cached_links: dict[bytes, object] = {} +_nomadnet_links_lock = threading.Lock() + +# Wait granularity while polling for path / link (seconds). Smaller = faster reaction, slightly more wakeups. +_POLL_INTERVAL_S = 0.02 + + +def get_cached_active_link(destination_hash: bytes): + """Return a cached link if present and ACTIVE; drop stale entries.""" + with _nomadnet_links_lock: + link = nomadnet_cached_links.get(destination_hash) + if link is None: + return None + if link.status is RNS.Link.ACTIVE: + return link + try: + del nomadnet_cached_links[destination_hash] + except KeyError: + pass + return None + + +def _cache_link_if_active(destination_hash: bytes, link) -> None: + if link is None or link.status is not RNS.Link.ACTIVE: + return + with _nomadnet_links_lock: + nomadnet_cached_links[destination_hash] = link + + +def _uncache_link_if_matches(destination_hash: bytes, link) -> None: + if link is None: + return + with _nomadnet_links_lock: + if nomadnet_cached_links.get(destination_hash) is link: + try: + del nomadnet_cached_links[destination_hash] + except KeyError: + pass class NomadnetDownloader: @@ -20,6 +59,8 @@ class NomadnetDownloader: on_download_failure: Callable[[str], None], on_progress_update: Callable[[float], None], timeout: int | None = None, + *, + on_phase: Callable[[str], None] | None = None, ): self.app_name = "nomadnetwork" self.aspects = "node" @@ -30,77 +71,83 @@ class NomadnetDownloader: self._download_success_callback = on_download_success self._download_failure_callback = on_download_failure self.on_progress_update = on_progress_update + self._on_phase = on_phase self.request_receipt = None self.is_cancelled = False self.link = None - # cancel the download + def _emit_phase(self, phase: str) -> None: + if self._on_phase is None: + return + try: + self._on_phase(phase) + except Exception: + pass + def cancel(self): self.is_cancelled = True - # cancel the request if it exists if self.request_receipt is not None: try: self.request_receipt.cancel() except Exception as e: print(f"Failed to cancel request: {e}") - # clean up the link if we created it if self.link is not None: + _uncache_link_if_matches(self.destination_hash, self.link) try: self.link.teardown() except Exception as e: print(f"Failed to teardown link: {e}") - # notify that download was cancelled self._download_failure_callback("cancelled") - # setup link to destination and request download async def download( self, path_lookup_timeout: int = 15, link_establishment_timeout: int = 15, ): - # check if cancelled before starting if self.is_cancelled: return - # use existing established link if it's active - if self.destination_hash in nomadnet_cached_links: - link = nomadnet_cached_links[self.destination_hash] - if link.status is RNS.Link.ACTIVE: - print("[NomadnetDownloader] using existing link for request") - self.link_established(link) - return + cached = get_cached_active_link(self.destination_hash) + if cached is not None: + print("[NomadnetDownloader] using existing link for request") + self._emit_phase("requesting_page") + self.link = cached + self.link_established(cached) + return - # determine when to timeout timeout_after_seconds = time.time() + path_lookup_timeout - # check if we have a path to the destination if not RNS.Transport.has_path(self.destination_hash): - # we don't have a path, so we need to request it + self._emit_phase("finding_path") RNS.Transport.request_path(self.destination_hash) - # wait until we have a path, or give up after the configured timeout while ( not RNS.Transport.has_path(self.destination_hash) and time.time() < timeout_after_seconds ): - # check if cancelled during path lookup if self.is_cancelled: return - await asyncio.sleep(0.1) + await asyncio.sleep(_POLL_INTERVAL_S) - # if we still don't have a path, we can't establish a link, so bail out if not RNS.Transport.has_path(self.destination_hash): self._download_failure_callback("Could not find path to destination.") return - # check if cancelled before establishing link + cached = get_cached_active_link(self.destination_hash) + if cached is not None: + print("[NomadnetDownloader] using link cached while waiting for path") + self._emit_phase("requesting_page") + self.link = cached + self.link_established(cached) + return + if self.is_cancelled: return - # create destination to nomadnet node + self._emit_phase("establishing_link") identity = RNS.Identity.recall(self.destination_hash) destination = RNS.Destination( identity, @@ -110,37 +157,36 @@ class NomadnetDownloader: self.aspects, ) - # create link to destination + cached = get_cached_active_link(self.destination_hash) + if cached is not None: + print("[NomadnetDownloader] using link cached before establishing new link") + self._emit_phase("requesting_page") + self.link = cached + self.link_established(cached) + return + print("[NomadnetDownloader] establishing new link for request") link = RNS.Link(destination, established_callback=self.link_established) self.link = link - # determine when to timeout timeout_after_seconds = time.time() + link_establishment_timeout - # wait until we have established a link, or give up after the configured timeout - while ( - link.status is not RNS.Link.ACTIVE and time.time() < timeout_after_seconds - ): - # check if cancelled during link establishment + while link.status is not RNS.Link.ACTIVE and time.time() < timeout_after_seconds: if self.is_cancelled: return - await asyncio.sleep(0.1) + await asyncio.sleep(_POLL_INTERVAL_S) - # if we still haven't established a link, bail out if link.status is not RNS.Link.ACTIVE: self._download_failure_callback("Could not establish link to destination.") - # link to destination was established, we should now request the download def link_established(self, link): - # check if cancelled before requesting if self.is_cancelled: return - # cache link for using in future requests - nomadnet_cached_links[self.destination_hash] = link + self._emit_phase("transferring") + + _cache_link_if_active(self.destination_hash, link) - # request download over link self.request_receipt = link.request( self.path, data=self.data, @@ -150,15 +196,12 @@ class NomadnetDownloader: timeout=self.timeout, ) - # handle successful download def on_response(self, request_receipt: RNS.RequestReceipt): self._download_success_callback(request_receipt) - # handle failure def on_failed(self, request_receipt=None): self._download_failure_callback("request_failed") - # handle download progress def on_progress(self, request_receipt): self.on_progress_update(request_receipt.progress) @@ -173,6 +216,8 @@ class NomadnetPageDownloader(NomadnetDownloader): on_page_download_failure: Callable[[str], None], on_progress_update: Callable[[float], None], timeout: int | None = None, + *, + on_phase: Callable[[str], None] | None = None, ): self.on_page_download_success = on_page_download_success self.on_page_download_failure = on_page_download_failure @@ -184,14 +229,21 @@ class NomadnetPageDownloader(NomadnetDownloader): self.on_download_failure, on_progress_update, timeout, + on_phase=on_phase, ) - # page download was successful, decode the response and send to provided callback def on_download_success(self, request_receipt: RNS.RequestReceipt): - micron_markup_response = request_receipt.response.decode("utf-8") + raw = request_receipt.response + if raw is None: + self.on_page_download_failure("empty_response") + return + try: + micron_markup_response = raw.decode("utf-8", errors="replace") + except (AttributeError, TypeError): + self.on_page_download_failure("invalid_response_body") + return self.on_page_download_success(micron_markup_response) - # page download failed, send error to provided callback def on_download_failure(self, failure_reason): self.on_page_download_failure(failure_reason) @@ -205,6 +257,8 @@ class NomadnetFileDownloader(NomadnetDownloader): on_file_download_failure: Callable[[str], None], on_progress_update: Callable[[float], None], timeout: int | None = None, + *, + on_phase: Callable[[str], None] | None = None, ): self.on_file_download_success = on_file_download_success self.on_file_download_failure = on_file_download_failure @@ -216,46 +270,42 @@ class NomadnetFileDownloader(NomadnetDownloader): self.on_download_failure, on_progress_update, timeout, + on_phase=on_phase, ) - # file download was successful, decode the response and send to provided callback def on_download_success(self, request_receipt: RNS.RequestReceipt): - # get response response = request_receipt.response - # handle buffered reader response if isinstance(response, io.BufferedReader): - # get file name from metadata file_name = "downloaded_file" metadata = request_receipt.metadata if metadata is not None and "name" in metadata: - file_path = metadata["name"].decode("utf-8") - file_name = os.path.basename(file_path) + try: + file_path = metadata["name"].decode("utf-8", errors="replace") + file_name = os.path.basename(file_path) + except (AttributeError, TypeError): + pass - # get file data file_data: bytes = response.read() self.on_file_download_success(file_name, file_data) return - # check for list response with bytes in position 0, and metadata dict in position 1 - # e.g: [file_bytes, {name: "filename.ext"}] - if isinstance(response, list) and isinstance(response[1], dict): + if isinstance(response, list) and len(response) > 1 and isinstance(response[1], dict): file_data: bytes = response[0] metadata: dict = response[1] - # get file name from metadata file_name = "downloaded_file" if metadata is not None and "name" in metadata: - file_path = metadata["name"].decode("utf-8") - file_name = os.path.basename(file_path) + try: + file_path = metadata["name"].decode("utf-8", errors="replace") + file_name = os.path.basename(file_path) + except (AttributeError, TypeError): + pass self.on_file_download_success(file_name, file_data) return - # try using original response format - # unsure if this is actually used anymore now that a buffered reader is provided - # have left here just in case... try: file_name: str = response[0] file_data: bytes = response[1] @@ -263,6 +313,5 @@ class NomadnetFileDownloader(NomadnetDownloader): except Exception: self.on_download_failure("unsupported_response") - # page download failed, send error to provided callback def on_download_failure(self, failure_reason): self.on_file_download_failure(failure_reason) diff --git a/meshchatx/src/frontend/components/nomadnetwork/NomadNetworkPage.vue b/meshchatx/src/frontend/components/nomadnetwork/NomadNetworkPage.vue index 5569318..2b605b7 100644 --- a/meshchatx/src/frontend/components/nomadnetwork/NomadNetworkPage.vue +++ b/meshchatx/src/frontend/components/nomadnetwork/NomadNetworkPage.vue @@ -79,8 +79,14 @@ @click="onDestinationPathClick(selectedNodePath)" > - {{ selectedNodePath.hops }} - {{ selectedNodePath.hops === 1 ? $t("app.hop") : $t("app.hops_plural") }} away + {{ selectedNodePath.hops === 1 ? $t("app.hop") : $t("app.hops_plural") }} + {{ $t("nomadnet.path_away_suffix") }} + + @@ -249,7 +255,7 @@ > -
Loading {{ nodePageProgress }}%
+
{{ nomadnetPageLoadingLine }}