From 5f161b7e17ca98dbca3799f0a9b2ece1d72f6b1a Mon Sep 17 00:00:00 2001 From: Ivan Date: Sun, 12 Apr 2026 17:03:32 -0500 Subject: [PATCH] feat(conversation): add API endpoints for managing conversation pins and improve path request handling --- meshchatx/meshchat.py | 105 ++++++++++++++++++--- meshchatx/src/backend/database/messages.py | 43 +++++++++ meshchatx/src/backend/database/schema.py | 13 ++- meshchatx/src/backend/message_handler.py | 4 + 4 files changed, 150 insertions(+), 15 deletions(-) diff --git a/meshchatx/meshchat.py b/meshchatx/meshchat.py index eec7e3c..9b9ae1c 100644 --- a/meshchatx/meshchat.py +++ b/meshchatx/meshchat.py @@ -6860,6 +6860,26 @@ class ReticulumMeshChat: }, ) + # proactively ask Reticulum to resolve or refresh path (non-blocking HTTP; discovery runs in background) + @routes.post("/api/v1/destination/{destination_hash}/request-path") + async def destination_request_path_fire(request): + destination_hash = request.match_info.get("destination_hash", "") + try: + destination_hash_bytes = bytes.fromhex(destination_hash) + except Exception: + return web.json_response( + { + "message": "invalid destination hash", + }, + status=400, + ) + RNS.Transport.request_path(destination_hash_bytes) + return web.json_response( + { + "message": "ok", + }, + ) + # get signal metrics for a destination by checking the latest announce or lxmf message received from them @routes.get("/api/v1/destination/{destination_hash}/signal-metrics") async def destination_signal_metrics(request): @@ -8241,6 +8261,14 @@ class ReticulumMeshChat: # get source hash from local lxmf destination local_hash = self.local_lxmf_destination.hash.hex() + for message_hash in self.database.messages.list_message_hashes_for_peer( + destination_hash + ): + try: + self.message_router.cancel_outbound(bytes.fromhex(message_hash)) + except Exception: + pass + # delete lxmf messages from db where "source to destination" or "destination to source" self.message_handler.delete_conversation(local_hash, destination_hash) @@ -8250,6 +8278,32 @@ class ReticulumMeshChat: }, ) + @routes.get("/api/v1/lxmf/conversation-pins") + async def lxmf_conversation_pins_get(request): + peer_hashes = self.database.messages.get_pinned_peer_hashes() + return web.json_response({"peer_hashes": peer_hashes}) + + @routes.post("/api/v1/lxmf/conversation-pins/toggle") + async def lxmf_conversation_pins_toggle(request): + try: + data = await request.json() + except Exception: + return web.json_response({"message": "invalid json"}, status=400) + destination_hash = ( + data.get("destination_hash") if isinstance(data, dict) else None + ) + if not destination_hash: + return web.json_response( + {"message": "missing destination_hash"}, status=400 + ) + pinned = self.database.messages.toggle_peer_pin(destination_hash) + return web.json_response( + { + "peer_hashes": self.database.messages.get_pinned_peer_hashes(), + "pinned": pinned, + }, + ) + # get lxmf conversations @routes.get("/api/v1/lxmf/conversations") async def lxmf_conversations_get(request): @@ -8446,6 +8500,13 @@ class ReticulumMeshChat: ) local_hash = self.local_lxmf_destination.hexhash for dest_hash in destination_hashes: + for message_hash in self.database.messages.list_message_hashes_for_peer( + dest_hash + ): + try: + self.message_router.cancel_outbound(bytes.fromhex(message_hash)) + except Exception: + pass self.message_handler.delete_conversation(local_hash, dest_hash) return web.json_response({"message": "Conversations deleted"}) @@ -12141,6 +12202,33 @@ class ReticulumMeshChat: ctx.database.messages.upsert_lxmf_message(lxmf_message_dict) + def _lxmf_path_wait_seconds(self): + try: + base = float(RNS.Transport.PATH_REQUEST_TIMEOUT) + except Exception: + base = 30.0 + return max(30.0, min(base, 120.0)) + + async def _await_transport_path(self, destination_hash_bytes: bytes) -> bool: + deadline = time.time() + self._lxmf_path_wait_seconds() + if not RNS.Transport.has_path(destination_hash_bytes): + RNS.Transport.request_path(destination_hash_bytes) + while ( + not RNS.Transport.has_path(destination_hash_bytes) + and time.time() < deadline + ): + await asyncio.sleep(0.1) + if RNS.Transport.has_path(destination_hash_bytes): + return True + RNS.Transport.request_path(destination_hash_bytes) + deadline = time.time() + max(15.0, self._lxmf_path_wait_seconds() * 0.5) + while ( + not RNS.Transport.has_path(destination_hash_bytes) + and time.time() < deadline + ): + await asyncio.sleep(0.1) + return RNS.Transport.has_path(destination_hash_bytes) + # upserts the provided announce to the database # handle sending an lxmf message to reticulum async def send_message( @@ -12167,20 +12255,9 @@ class ReticulumMeshChat: # convert destination hash to bytes destination_hash_bytes = bytes.fromhex(destination_hash) - # determine when to timeout finding path - timeout_after_seconds = time.time() + 10 - - # check if we have a path to the destination - if not RNS.Transport.has_path(destination_hash_bytes): - # we don't have a path, so we need to request it - RNS.Transport.request_path(destination_hash_bytes) - - # wait until we have a path, or give up after the configured timeout - while ( - not RNS.Transport.has_path(destination_hash_bytes) - and time.time() < timeout_after_seconds - ): - await asyncio.sleep(0.1) + # Reticulum keeps a live path table; entries expire when peers move or links drop. + # We cannot replay "old" paths from the app layer — Transport.request_path refreshes discovery. + await self._await_transport_path(destination_hash_bytes) # find destination identity from hash destination_identity = RNS.Identity.recall(destination_hash_bytes) diff --git a/meshchatx/src/backend/database/messages.py b/meshchatx/src/backend/database/messages.py index 38f88a4..af21db3 100644 --- a/meshchatx/src/backend/database/messages.py +++ b/meshchatx/src/backend/database/messages.py @@ -65,6 +65,49 @@ class MessageDAO: (message_hash,), ) + def list_message_hashes_for_peer(self, peer_hash): + rows = self.provider.fetchall( + "SELECT hash FROM lxmf_messages WHERE peer_hash = ?", + (peer_hash,), + ) + return [r["hash"] for r in rows] + + def get_pinned_peer_hashes(self): + rows = self.provider.fetchall( + "SELECT peer_hash FROM lxmf_conversation_pins ORDER BY pinned_at DESC", + ) + return [r["peer_hash"] for r in rows] + + def is_peer_pinned(self, peer_hash): + row = self.provider.fetchone( + "SELECT 1 AS ok FROM lxmf_conversation_pins WHERE peer_hash = ?", + (peer_hash,), + ) + return row is not None + + def set_peer_pinned(self, peer_hash, pinned): + if pinned: + self.provider.execute( + """ + INSERT INTO lxmf_conversation_pins (peer_hash, pinned_at) + VALUES (?, strftime('%s', 'now')) + ON CONFLICT(peer_hash) DO UPDATE SET pinned_at = EXCLUDED.pinned_at + """, + (peer_hash,), + ) + else: + self.provider.execute( + "DELETE FROM lxmf_conversation_pins WHERE peer_hash = ?", + (peer_hash,), + ) + + def toggle_peer_pin(self, peer_hash): + if self.is_peer_pinned(peer_hash): + self.set_peer_pinned(peer_hash, False) + return False + self.set_peer_pinned(peer_hash, True) + return True + def delete_lxmf_messages_by_hashes(self, message_hashes): if not message_hashes: return diff --git a/meshchatx/src/backend/database/schema.py b/meshchatx/src/backend/database/schema.py index 42c8a3b..b79cc35 100644 --- a/meshchatx/src/backend/database/schema.py +++ b/meshchatx/src/backend/database/schema.py @@ -13,7 +13,7 @@ def _validate_identifier(name: str, label: str = "identifier") -> str: class DatabaseSchema: - LATEST_VERSION = 42 + LATEST_VERSION = 43 def __init__(self, provider: DatabaseProvider): self.provider = provider @@ -1108,6 +1108,17 @@ class DatabaseSchema: "CREATE INDEX IF NOT EXISTS idx_trusted_login_identity ON trusted_login_clients(identity_hash)", ) + if current_version < 43: + self._safe_execute(""" + CREATE TABLE IF NOT EXISTS lxmf_conversation_pins ( + peer_hash TEXT PRIMARY KEY NOT NULL, + pinned_at INTEGER NOT NULL + ) + """) + self._safe_execute( + "CREATE INDEX IF NOT EXISTS idx_lxmf_conversation_pins_pinned_at ON lxmf_conversation_pins(pinned_at)", + ) + # Update version in config self._safe_execute( """ diff --git a/meshchatx/src/backend/message_handler.py b/meshchatx/src/backend/message_handler.py index 4c3d257..93398cd 100644 --- a/meshchatx/src/backend/message_handler.py +++ b/meshchatx/src/backend/message_handler.py @@ -50,6 +50,10 @@ class MessageHandler: "DELETE FROM lxmf_conversation_folders WHERE peer_hash = ?", [destination_hash], ) + self.db.provider.execute( + "DELETE FROM lxmf_conversation_pins WHERE peer_hash = ?", + [destination_hash], + ) def search_messages(self, local_hash, search_term, limit=500): search_term = _strip_utf16_surrogates(search_term) or ""