From d63255511f97b283f03ebc95e35f12fbe900051b Mon Sep 17 00:00:00 2001 From: Ivan Date: Thu, 16 Apr 2026 19:13:32 -0500 Subject: [PATCH] feat(reticulum): implement RNS restart functionality with UI updates --- meshchatx/meshchat.py | 231 ++++++++++++++++-- .../frontend/components/about/AboutPage.vue | 23 ++ .../components/interfaces/InterfacesPage.vue | 9 + .../components/settings/SettingsPage.vue | 15 +- tests/backend/test_rns_lifecycle.py | 94 +++++++ tests/frontend/AboutPage.test.js | 11 + .../InterfacesDiscoveryActions.test.js | 22 ++ tests/frontend/UIComponents.test.js | 1 + 8 files changed, 380 insertions(+), 26 deletions(-) diff --git a/meshchatx/meshchat.py b/meshchatx/meshchat.py index 9763164..681ecca 100644 --- a/meshchatx/meshchat.py +++ b/meshchatx/meshchat.py @@ -698,7 +698,7 @@ class ReticulumMeshChat: [[Default Interface]] type = AutoInterface - enabled = false + enabled = true name = Default Interface selected_interface_mode = 1 """ @@ -889,6 +889,152 @@ class ReticulumMeshChat: ), ) + def _force_close_listener(self, listener): + """Aggressively close a multiprocessing.connection.Listener. + + Calls Listener.close() and additionally drills through the inner + SocketListener wrapper to close the underlying socket file descriptor. + Necessary because the rpc_loop thread can retain references to the + listener that prevent the kernel from releasing abstract AF_UNIX + addresses on plain close(). + """ + try: + if hasattr(listener, "close"): + with contextlib.suppress(Exception): + listener.close() + finally: + socket_type = getattr(socket, "SocketType", None) + wrappers = [listener] + + listener_inner = getattr(listener, "_listener", None) + if listener_inner is not None: + wrappers.append(listener_inner) + + for wrapper in list(wrappers): + inner_socket = getattr(wrapper, "_socket", None) + if inner_socket is not None: + wrappers.append(inner_socket) + plain_socket = getattr(wrapper, "socket", None) + if plain_socket is not None: + wrappers.append(plain_socket) + + for obj in wrappers: + if socket_type is None or not isinstance(obj, socket_type): + continue + fileno = -1 + try: + fileno = obj.fileno() + except Exception: # noqa: S110 + pass + with contextlib.suppress(Exception): + obj.close() + if fileno != -1: + try: + os.close(fileno) + except OSError: + pass + + for wrapper in wrappers: + with contextlib.suppress(Exception): + if hasattr(wrapper, "_socket"): + wrapper._socket = None + with contextlib.suppress(Exception): + if hasattr(wrapper, "socket"): + wrapper.socket = None + with contextlib.suppress(Exception): + if hasattr(listener, "_listener"): + listener._listener = None + + def _force_close_abstract_unix_addr(self, addr) -> bool: + """Close every socket FD in the current process bound to addr. + + Returns True if any FD was closed. addr is expected to be a string + starting with a NUL byte (abstract AF_UNIX namespace). + """ + if not (isinstance(addr, str) and addr.startswith("\0")): + return False + + target_bytes = addr.encode("utf-8", errors="replace") + target_no_nul = target_bytes[1:] + closed_any = False + + try: + current_process = psutil.Process() + for conn in current_process.net_connections(kind="unix"): + try: + laddr = getattr(conn, "laddr", None) + fd = getattr(conn, "fd", -1) + if not laddr or fd in (-1, None): + continue + + if isinstance(laddr, str): + laddr_bytes = laddr.encode("utf-8", errors="replace") + elif isinstance(laddr, bytes): + laddr_bytes = laddr + else: + continue + + laddr_no_nul = ( + laddr_bytes[1:] + if laddr_bytes.startswith(b"\0") + else laddr_bytes + ) + + if ( + laddr_bytes == target_bytes + or laddr_bytes == target_no_nul + or laddr_no_nul == target_no_nul + ): + try: + os.close(fd) + closed_any = True + print( + f"Force closed lingering abstract UNIX FD {fd} for {addr[1:]}", + ) + except OSError as fd_err: + print( + f"Failed to close FD {fd} for {addr[1:]}: {fd_err}", + ) + except Exception: # noqa: S110 + pass + except Exception as e: + print(f"Error scanning process for abstract UNIX FDs: {e}") + + if closed_any: + gc.collect() + time.sleep(0.2) + + return closed_any + + def _read_reticulum_instance_name(self): + """Return current Reticulum instance_name from config or None.""" + config_dir = self._normalize_reticulum_config_dir( + getattr(self, "reticulum_config_dir", None), + ) + config_path = os.path.join(config_dir, "config") + if not os.path.isfile(config_path): + return None + + cp = configparser.ConfigParser() + cp.read(config_path) + if not cp.has_section("reticulum"): + return None + return cp.get("reticulum", "instance_name", fallback=None) + + def _write_reticulum_instance_name(self, instance_name): + """Persist a Reticulum instance_name value into the config.""" + config_dir = self._normalize_reticulum_config_dir( + getattr(self, "reticulum_config_dir", None), + ) + config_path = os.path.join(config_dir, "config") + cp = configparser.ConfigParser() + cp.read(config_path) + if not cp.has_section("reticulum"): + cp.add_section("reticulum") + cp.set("reticulum", "instance_name", instance_name) + with open(config_path, "w", encoding="utf-8") as f: + cp.write(f) + async def reload_reticulum(self): print("Hot reloading Reticulum stack...") # Keep reference to old reticulum instance for cleanup @@ -1004,9 +1150,7 @@ class ReticulumMeshChat: except Exception as e: print(f"Warning during aggressive interface cleanup: {e}") - # Close RPC listener if it exists on the instance if old_reticulum: - # Reticulum uses private attributes for the listener rpc_listener_names = [ "rpc_listener", "_Reticulum__rpc_listener", @@ -1020,8 +1164,7 @@ class ReticulumMeshChat: print( f"Forcing closure of RPC listener in {attr_name}...", ) - if hasattr(listener, "close"): - listener.close() + self._force_close_listener(listener) setattr(old_reticulum, attr_name, None) except Exception as e: print(f"Warning closing RPC listener {attr_name}: {e}") @@ -1066,8 +1209,6 @@ class ReticulumMeshChat: if hasattr(self, "reticulum"): del self.reticulum - # Wait another moment for sockets to definitely be released by OS - # Also give some time for the RPC listener port to settle print("Waiting for ports to settle...") await asyncio.sleep(4) @@ -1118,10 +1259,10 @@ class ReticulumMeshChat: print(f"Warning reading Reticulum config for ports: {e}") if not rpc_addrs: - # Defaults rpc_addrs.append((("127.0.0.1", 37429), "AF_INET")) rpc_addrs.append((("127.0.0.1", 37428), "AF_INET")) + abstract_unix_addr_in_use_after_wait = False for i in range(15): all_free = True for addr, family_str in rpc_addrs: @@ -1158,11 +1299,29 @@ class ReticulumMeshChat: and addr.startswith("\0") ) if is_abstract_unix_addr: - # Another local shared-instance daemon may own this address. - # Do not block reload on abstract UNIX sockets. - print( - f"Abstract RPC addr {addr_display} is occupied by another process; continuing reload.", - ) + released = False + if self._force_close_abstract_unix_addr(addr): + try: + s2 = socket.socket( + socket.AF_UNIX, + socket.SOCK_STREAM, + ) + try: + s2.bind(addr) + s2.close() + print( + f"Released abstract RPC addr {addr_display} from this process.", + ) + released = True + except OSError: + s2.close() + except Exception: # noqa: S110 + pass + + if released: + continue + + all_free = False continue all_free = False @@ -1282,9 +1441,16 @@ class ReticulumMeshChat: await asyncio.sleep(1) if not all_free: - # One last attempt with a very short sleep before failing await asyncio.sleep(2) - # Check again one last time + for addr, family_str in rpc_addrs: + if ( + family_str == "AF_UNIX" + and isinstance(addr, str) + and addr.startswith("\0") + ): + with contextlib.suppress(Exception): + self._force_close_abstract_unix_addr(addr) + last_check_all_free = True for addr, family_str in rpc_addrs: try: @@ -1298,8 +1464,16 @@ class ReticulumMeshChat: s.bind(addr) s.close() except OSError: - last_check_all_free = False s.close() + is_abstract = ( + family == socket.AF_UNIX + and isinstance(addr, str) + and addr.startswith("\0") + ) + if is_abstract: + abstract_unix_addr_in_use_after_wait = True + continue + last_check_all_free = False break except Exception: # noqa: S110 pass @@ -1312,20 +1486,37 @@ class ReticulumMeshChat: ) print("RNS ports finally free after last-second check.") - # Final GC to ensure everything is released gc.collect() - # Clear singleton right before spinning up a fresh RNS instance. if hasattr(RNS.Reticulum, "_Reticulum__instance"): RNS.Reticulum._Reticulum__instance = None - # Re-setup identity (this starts background loops again) + original_instance_name = None + switched_instance_name = None + if abstract_unix_addr_in_use_after_wait: + original_instance_name = self._read_reticulum_instance_name() + base_name = original_instance_name or "default" + switched_instance_name = ( + f"{base_name}-reload-{os.getpid()}-{int(time.time())}" + ) + self._write_reticulum_instance_name(switched_instance_name) + print( + "Abstract UNIX RPC address remained busy. " + f"Retrying with temporary instance_name={switched_instance_name}", + ) + self.running = True await self._send_rns_reload_status( "starting-services", "Starting identity services again...", ) - self.setup_identity(identity_to_restore) + try: + self.setup_identity(identity_to_restore) + finally: + if switched_instance_name: + self._write_reticulum_instance_name( + original_instance_name or "default", + ) await self._send_rns_reload_status( "done", "RNS reload complete.", diff --git a/meshchatx/src/frontend/components/about/AboutPage.vue b/meshchatx/src/frontend/components/about/AboutPage.vue index ac9c616..aaa52cf 100644 --- a/meshchatx/src/frontend/components/about/AboutPage.vue +++ b/meshchatx/src/frontend/components/about/AboutPage.vue @@ -72,6 +72,15 @@ {{ $t("common.restart_app") }} + + diff --git a/meshchatx/src/frontend/components/settings/SettingsPage.vue b/meshchatx/src/frontend/components/settings/SettingsPage.vue index 653b7e3..bed992e 100644 --- a/meshchatx/src/frontend/components/settings/SettingsPage.vue +++ b/meshchatx/src/frontend/components/settings/SettingsPage.vue @@ -2058,17 +2058,20 @@
-
+
-

- {{ $t("app.reload_rns_description") }} -

{ expect(shutdownSpy).toHaveBeenCalled(); }); + it("restartRns posts reticulum reload endpoint", async () => { + const wrapper = mountAboutPage(); + wrapper.vm.appInfo = { version: "1.0.0" }; + await wrapper.vm.$nextTick(); + + axiosMock.post.mockResolvedValueOnce({ data: { message: "RNS restarted" } }); + await wrapper.vm.restartRns(); + + expect(axiosMock.post).toHaveBeenCalledWith("/api/v1/reticulum/reload"); + }); + it("updates app info periodically", async () => { axiosMock.get.mockResolvedValue({ data: { diff --git a/tests/frontend/InterfacesDiscoveryActions.test.js b/tests/frontend/InterfacesDiscoveryActions.test.js index b776171..6147bcc 100644 --- a/tests/frontend/InterfacesDiscoveryActions.test.js +++ b/tests/frontend/InterfacesDiscoveryActions.test.js @@ -169,4 +169,26 @@ describe("InterfacesPage discovery actions", () => { interface_discovery_blacklist: "10.0.0.8:4242", }); }); + + it("reloadRns posts reticulum restart endpoint", async () => { + const wrapper = mount(InterfacesPage, { + global: { + stubs: { + RouterLink: true, + MaterialDesignIcon: true, + Toggle: true, + ImportInterfacesModal: true, + Interface: true, + }, + mocks: { + $t: (key) => key, + $router: { push: vi.fn() }, + }, + }, + }); + + await wrapper.vm.reloadRns(); + + expect(mockAxios.post).toHaveBeenCalledWith("/api/v1/reticulum/reload"); + }); }); diff --git a/tests/frontend/UIComponents.test.js b/tests/frontend/UIComponents.test.js index 356883e..a0d7385 100644 --- a/tests/frontend/UIComponents.test.js +++ b/tests/frontend/UIComponents.test.js @@ -539,6 +539,7 @@ describe("SettingsPage Component", () => { await wrapper.vm.$nextTick(); expect(wrapper.text()).toContain("app.reload_rns"); + expect(wrapper.text()).not.toContain("app.reload_rns_description"); }); it("enabling transport shows success toast after reload", async () => {