feat(conversation): add API endpoints for managing conversation pins and improve path request handling

This commit is contained in:
Ivan
2026-04-12 17:03:32 -05:00
parent 417b66d0c8
commit 5f161b7e17
4 changed files with 150 additions and 15 deletions
+91 -14
View File
@@ -6860,6 +6860,26 @@ class ReticulumMeshChat:
},
)
# proactively ask Reticulum to resolve or refresh path (non-blocking HTTP; discovery runs in background)
@routes.post("/api/v1/destination/{destination_hash}/request-path")
async def destination_request_path_fire(request):
destination_hash = request.match_info.get("destination_hash", "")
try:
destination_hash_bytes = bytes.fromhex(destination_hash)
except Exception:
return web.json_response(
{
"message": "invalid destination hash",
},
status=400,
)
RNS.Transport.request_path(destination_hash_bytes)
return web.json_response(
{
"message": "ok",
},
)
# get signal metrics for a destination by checking the latest announce or lxmf message received from them
@routes.get("/api/v1/destination/{destination_hash}/signal-metrics")
async def destination_signal_metrics(request):
@@ -8241,6 +8261,14 @@ class ReticulumMeshChat:
# get source hash from local lxmf destination
local_hash = self.local_lxmf_destination.hash.hex()
for message_hash in self.database.messages.list_message_hashes_for_peer(
destination_hash
):
try:
self.message_router.cancel_outbound(bytes.fromhex(message_hash))
except Exception:
pass
# delete lxmf messages from db where "source to destination" or "destination to source"
self.message_handler.delete_conversation(local_hash, destination_hash)
@@ -8250,6 +8278,32 @@ class ReticulumMeshChat:
},
)
@routes.get("/api/v1/lxmf/conversation-pins")
async def lxmf_conversation_pins_get(request):
peer_hashes = self.database.messages.get_pinned_peer_hashes()
return web.json_response({"peer_hashes": peer_hashes})
@routes.post("/api/v1/lxmf/conversation-pins/toggle")
async def lxmf_conversation_pins_toggle(request):
try:
data = await request.json()
except Exception:
return web.json_response({"message": "invalid json"}, status=400)
destination_hash = (
data.get("destination_hash") if isinstance(data, dict) else None
)
if not destination_hash:
return web.json_response(
{"message": "missing destination_hash"}, status=400
)
pinned = self.database.messages.toggle_peer_pin(destination_hash)
return web.json_response(
{
"peer_hashes": self.database.messages.get_pinned_peer_hashes(),
"pinned": pinned,
},
)
# get lxmf conversations
@routes.get("/api/v1/lxmf/conversations")
async def lxmf_conversations_get(request):
@@ -8446,6 +8500,13 @@ class ReticulumMeshChat:
)
local_hash = self.local_lxmf_destination.hexhash
for dest_hash in destination_hashes:
for message_hash in self.database.messages.list_message_hashes_for_peer(
dest_hash
):
try:
self.message_router.cancel_outbound(bytes.fromhex(message_hash))
except Exception:
pass
self.message_handler.delete_conversation(local_hash, dest_hash)
return web.json_response({"message": "Conversations deleted"})
@@ -12141,6 +12202,33 @@ class ReticulumMeshChat:
ctx.database.messages.upsert_lxmf_message(lxmf_message_dict)
def _lxmf_path_wait_seconds(self):
try:
base = float(RNS.Transport.PATH_REQUEST_TIMEOUT)
except Exception:
base = 30.0
return max(30.0, min(base, 120.0))
async def _await_transport_path(self, destination_hash_bytes: bytes) -> bool:
deadline = time.time() + self._lxmf_path_wait_seconds()
if not RNS.Transport.has_path(destination_hash_bytes):
RNS.Transport.request_path(destination_hash_bytes)
while (
not RNS.Transport.has_path(destination_hash_bytes)
and time.time() < deadline
):
await asyncio.sleep(0.1)
if RNS.Transport.has_path(destination_hash_bytes):
return True
RNS.Transport.request_path(destination_hash_bytes)
deadline = time.time() + max(15.0, self._lxmf_path_wait_seconds() * 0.5)
while (
not RNS.Transport.has_path(destination_hash_bytes)
and time.time() < deadline
):
await asyncio.sleep(0.1)
return RNS.Transport.has_path(destination_hash_bytes)
# upserts the provided announce to the database
# handle sending an lxmf message to reticulum
async def send_message(
@@ -12167,20 +12255,9 @@ class ReticulumMeshChat:
# convert destination hash to bytes
destination_hash_bytes = bytes.fromhex(destination_hash)
# determine when to timeout finding path
timeout_after_seconds = time.time() + 10
# check if we have a path to the destination
if not RNS.Transport.has_path(destination_hash_bytes):
# we don't have a path, so we need to request it
RNS.Transport.request_path(destination_hash_bytes)
# wait until we have a path, or give up after the configured timeout
while (
not RNS.Transport.has_path(destination_hash_bytes)
and time.time() < timeout_after_seconds
):
await asyncio.sleep(0.1)
# Reticulum keeps a live path table; entries expire when peers move or links drop.
# We cannot replay "old" paths from the app layer — Transport.request_path refreshes discovery.
await self._await_transport_path(destination_hash_bytes)
# find destination identity from hash
destination_identity = RNS.Identity.recall(destination_hash_bytes)
@@ -65,6 +65,49 @@ class MessageDAO:
(message_hash,),
)
def list_message_hashes_for_peer(self, peer_hash):
rows = self.provider.fetchall(
"SELECT hash FROM lxmf_messages WHERE peer_hash = ?",
(peer_hash,),
)
return [r["hash"] for r in rows]
def get_pinned_peer_hashes(self):
rows = self.provider.fetchall(
"SELECT peer_hash FROM lxmf_conversation_pins ORDER BY pinned_at DESC",
)
return [r["peer_hash"] for r in rows]
def is_peer_pinned(self, peer_hash):
row = self.provider.fetchone(
"SELECT 1 AS ok FROM lxmf_conversation_pins WHERE peer_hash = ?",
(peer_hash,),
)
return row is not None
def set_peer_pinned(self, peer_hash, pinned):
if pinned:
self.provider.execute(
"""
INSERT INTO lxmf_conversation_pins (peer_hash, pinned_at)
VALUES (?, strftime('%s', 'now'))
ON CONFLICT(peer_hash) DO UPDATE SET pinned_at = EXCLUDED.pinned_at
""",
(peer_hash,),
)
else:
self.provider.execute(
"DELETE FROM lxmf_conversation_pins WHERE peer_hash = ?",
(peer_hash,),
)
def toggle_peer_pin(self, peer_hash):
if self.is_peer_pinned(peer_hash):
self.set_peer_pinned(peer_hash, False)
return False
self.set_peer_pinned(peer_hash, True)
return True
def delete_lxmf_messages_by_hashes(self, message_hashes):
if not message_hashes:
return
+12 -1
View File
@@ -13,7 +13,7 @@ def _validate_identifier(name: str, label: str = "identifier") -> str:
class DatabaseSchema:
LATEST_VERSION = 42
LATEST_VERSION = 43
def __init__(self, provider: DatabaseProvider):
self.provider = provider
@@ -1108,6 +1108,17 @@ class DatabaseSchema:
"CREATE INDEX IF NOT EXISTS idx_trusted_login_identity ON trusted_login_clients(identity_hash)",
)
if current_version < 43:
self._safe_execute("""
CREATE TABLE IF NOT EXISTS lxmf_conversation_pins (
peer_hash TEXT PRIMARY KEY NOT NULL,
pinned_at INTEGER NOT NULL
)
""")
self._safe_execute(
"CREATE INDEX IF NOT EXISTS idx_lxmf_conversation_pins_pinned_at ON lxmf_conversation_pins(pinned_at)",
)
# Update version in config
self._safe_execute(
"""
+4
View File
@@ -50,6 +50,10 @@ class MessageHandler:
"DELETE FROM lxmf_conversation_folders WHERE peer_hash = ?",
[destination_hash],
)
self.db.provider.execute(
"DELETE FROM lxmf_conversation_pins WHERE peer_hash = ?",
[destination_hash],
)
def search_messages(self, local_hash, search_term, limit=500):
search_term = _strip_utf16_surrogates(search_term) or ""