Files
2026-05-06 18:44:17 +02:00

2022 lines
79 KiB
Python

import RNS
import os
import shutil
import time
import nomadnet
import LXMF
import urwid
from datetime import datetime, timedelta
from nomadnet.Directory import DirectoryEntry
from nomadnet.Conversation import ConversationMessage
from nomadnet.util import strip_modifiers
from nomadnet.util import sanitize_name
def relative_time(timestamp):
now = time.time()
delta = now - timestamp
if delta < 0:
return "just now"
elif delta < 60:
return "just now"
elif delta < 3600:
m = int(delta / 60)
return str(m)+"m ago"
elif delta < 86400:
h = int(delta / 3600)
return str(h)+"h ago"
elif delta < 172800:
return "yesterday"
elif delta < 604800:
d = int(delta / 86400)
return str(d)+"d ago"
elif delta < 2592000:
w = int(delta / 604800)
return str(w)+"w ago"
else:
return datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d")
def _format_size(size):
if size < 1024:
return str(size)+" B"
elif size < 1048576:
return str(round(size/1024, 1))+" KB"
else:
return str(round(size/1048576, 1))+" MB"
from nomadnet.vendor.additional_urwid_widgets import IndicativeListBox
class ConversationListDisplayShortcuts():
def __init__(self, app):
self.app = app
self.widget = urwid.AttrMap(urwid.Text("[C-e] Peer Info [C-x] Delete [C-r] Sync [C-n] New [C-u] Ingest URI [C-o] Sort [C-g] Fullscreen"), "shortcutbar")
class ConversationDisplayShortcuts():
def __init__(self, app):
self.app = app
self.widget = urwid.AttrMap(urwid.Text("[C-d] Send [C-p] Paper Msg [C-t] Title [C-a] Attach [C-s] Save [C-k] Clear [C-w] Close [C-u] Purge [C-x] Clear History [C-o] Sort"), "shortcutbar")
class ConversationsArea(urwid.LineBox):
def keypress(self, size, key):
if key == "ctrl e":
self.delegate.edit_selected_in_directory()
elif key == "ctrl x":
self.delegate.delete_selected_conversation()
elif key == "ctrl n":
self.delegate.new_conversation()
elif key == "ctrl u":
self.delegate.ingest_lxm_uri()
elif key == "ctrl r":
self.delegate.sync_conversations()
elif key == "ctrl g":
self.delegate.toggle_fullscreen()
elif key == "ctrl o":
self.delegate.toggle_list_sort()
elif key == "tab":
self.delegate.app.ui.main_display.frame.focus_position = "header"
elif key == "up" and (self.delegate.ilb.first_item_is_selected() or self.delegate.ilb.body_is_empty()):
self.delegate.app.ui.main_display.frame.focus_position = "header"
else:
return super(ConversationsArea, self).keypress(size, key)
class DialogLineBox(urwid.LineBox):
def keypress(self, size, key):
if key == "esc":
if hasattr(self.delegate, "update_conversation_list"):
self.delegate.update_conversation_list()
elif hasattr(self.delegate, "dialog_active"):
self.delegate.dialog_active = False
self.delegate.conversation_changed(None)
else:
return super(DialogLineBox, self).keypress(size, key)
class ConversationsDisplay():
list_width = 0.33
given_list_width = 52
cached_conversation_widgets = {}
SORT_RECENT = 0
SORT_NAME = 1
def __init__(self, app):
self.app = app
self.dialog_open = False
self.sync_dialog = None
self.currently_displayed_conversation = None
self.list_sort_mode = ConversationsDisplay.SORT_RECENT
def disp_list_shortcuts(sender, arg1, arg2):
self.shortcuts_display = self.list_shortcuts
self.app.ui.main_display.update_active_shortcuts()
self.update_listbox()
self.columns_widget = urwid.Columns(
[
# (urwid.WEIGHT, ConversationsDisplay.list_width, self.listbox),
# (urwid.WEIGHT, 1-ConversationsDisplay.list_width, self.make_conversation_widget(None))
(ConversationsDisplay.given_list_width, self.listbox),
(urwid.WEIGHT, 1, self.make_conversation_widget(None))
],
dividechars=0, focus_column=0, box_columns=[0]
)
self.list_shortcuts = ConversationListDisplayShortcuts(self.app)
self.editor_shortcuts = ConversationDisplayShortcuts(self.app)
self.shortcuts_display = self.list_shortcuts
self.widget = self.columns_widget
nomadnet.Conversation.created_callback = self.update_conversation_list
def focus_change_event(self):
if not self.dialog_open:
self.update_conversation_list()
def toggle_list_sort(self):
if self.list_sort_mode == ConversationsDisplay.SORT_RECENT:
self.list_sort_mode = ConversationsDisplay.SORT_NAME
else:
self.list_sort_mode = ConversationsDisplay.SORT_RECENT
self.update_conversation_list()
def update_listbox(self):
conversations = self.app.conversations()
if self.list_sort_mode == ConversationsDisplay.SORT_NAME:
conversations.sort(key=lambda e: (e[3].lower(), e[0]))
conversation_list_widgets = []
for conversation in conversations:
conversation_list_widgets.append(self.conversation_list_widget(conversation))
self.list_widgets = conversation_list_widgets
self.ilb = IndicativeListBox(
self.list_widgets,
on_selection_change=self.conversation_list_selection,
initialization_is_selection_change=False,
highlight_offFocus="list_off_focus"
)
self.listbox = ConversationsArea(urwid.Filler(self.ilb, height=urwid.RELATIVE_100), title="Conversations")
self.listbox.delegate = self
def delete_selected_conversation(self):
self.dialog_open = True
item = self.ilb.get_selected_item()
if item == None:
return
source_hash = item.source_hash
def dismiss_dialog(sender):
self.update_conversation_list()
self.dialog_open = False
def confirmed(sender):
self.dialog_open = False
self.delete_conversation(source_hash)
nomadnet.Conversation.delete_conversation(source_hash, self.app)
self.update_conversation_list()
dialog = DialogLineBox(
urwid.Pile([
urwid.Text(
"Delete conversation with\n"+self.app.directory.simplest_display_str(bytes.fromhex(source_hash))+"\n",
align=urwid.CENTER,
),
urwid.Columns([
(urwid.WEIGHT, 0.45, urwid.Button("Yes", on_press=confirmed)),
(urwid.WEIGHT, 0.1, urwid.Text("")),
(urwid.WEIGHT, 0.45, urwid.Button("No", on_press=dismiss_dialog)),
])
]), title="?"
)
dialog.delegate = self
bottom = self.listbox
overlay = urwid.Overlay(
dialog,
bottom,
align=urwid.CENTER,
width=urwid.RELATIVE_100,
valign=urwid.MIDDLE,
height=urwid.PACK,
left=2,
right=2,
)
# options = self.columns_widget.options(urwid.WEIGHT, ConversationsDisplay.list_width)
options = self.columns_widget.options(urwid.GIVEN, ConversationsDisplay.given_list_width)
self.columns_widget.contents[0] = (overlay, options)
def edit_selected_in_directory(self):
g = self.app.ui.glyphs
self.dialog_open = True
item = self.ilb.get_selected_item()
if item == None:
return
source_hash_text = item.source_hash
display_name = self.ilb.get_selected_item().display_name
if display_name == None:
display_name = ""
e_id = urwid.Edit(caption="Addr : ",edit_text=source_hash_text)
t_id = urwid.Text("Addr : "+source_hash_text)
e_name = urwid.Edit(caption="Name : ",edit_text=display_name)
selected_id_widget = t_id
untrusted_selected = False
unknown_selected = True
trusted_selected = False
direct_selected = True
propagated_selected = False
try:
if self.app.directory.find(bytes.fromhex(source_hash_text)):
trust_level = self.app.directory.trust_level(bytes.fromhex(source_hash_text))
if trust_level == DirectoryEntry.UNTRUSTED:
untrusted_selected = True
unknown_selected = False
trusted_selected = False
elif trust_level == DirectoryEntry.UNKNOWN:
untrusted_selected = False
unknown_selected = True
trusted_selected = False
elif trust_level == DirectoryEntry.TRUSTED:
untrusted_selected = False
unknown_selected = False
trusted_selected = True
if self.app.directory.preferred_delivery(bytes.fromhex(source_hash_text)) == DirectoryEntry.PROPAGATED:
direct_selected = False
propagated_selected = True
except Exception as e:
pass
trust_button_group = []
r_untrusted = urwid.RadioButton(trust_button_group, "Untrusted", state=untrusted_selected)
r_unknown = urwid.RadioButton(trust_button_group, "Unknown", state=unknown_selected)
r_trusted = urwid.RadioButton(trust_button_group, "Trusted", state=trusted_selected)
method_button_group = []
r_direct = urwid.RadioButton(method_button_group, "Deliver directly", state=direct_selected)
r_propagated = urwid.RadioButton(method_button_group, "Use propagation nodes", state=propagated_selected)
def dismiss_dialog(sender):
self.update_conversation_list()
self.dialog_open = False
def confirmed(sender):
try:
display_name = e_name.get_edit_text()
source_hash = bytes.fromhex(e_id.get_edit_text())
trust_level = DirectoryEntry.UNTRUSTED
if r_unknown.state == True:
trust_level = DirectoryEntry.UNKNOWN
elif r_trusted.state == True:
trust_level = DirectoryEntry.TRUSTED
delivery = DirectoryEntry.DIRECT
if r_propagated.state == True:
delivery = DirectoryEntry.PROPAGATED
entry = DirectoryEntry(source_hash, display_name, trust_level, preferred_delivery=delivery)
self.app.directory.remember(entry)
self.update_conversation_list()
self.dialog_open = False
self.app.ui.main_display.sub_displays.network_display.directory_change_callback()
except Exception as e:
RNS.log("Could not save directory entry. The contained exception was: "+str(e), RNS.LOG_VERBOSE)
if not dialog_pile.error_display:
dialog_pile.error_display = True
options = dialog_pile.options(height_type=urwid.PACK)
dialog_pile.contents.append((urwid.Text(""), options))
dialog_pile.contents.append((
urwid.Text(("error_text", "Could not save entry. Check your input."), align=urwid.CENTER),
options,)
)
source_is_known = self.app.directory.is_known(bytes.fromhex(source_hash_text))
if source_is_known:
known_section = urwid.Divider(g["divider1"])
else:
def query_action(sender, user_data):
self.close_conversation_by_hash(user_data)
nomadnet.Conversation.query_for_peer(user_data)
options = dialog_pile.options(height_type=urwid.PACK)
dialog_pile.contents = [
(urwid.Text("Query sent"), options),
(urwid.Button("OK", on_press=dismiss_dialog), options)
]
query_button = urwid.Button("Query network for keys", on_press=query_action, user_data=source_hash_text)
known_section = urwid.Pile([
urwid.Divider(g["divider1"]),
urwid.Text(g["info"]+"\n", align=urwid.CENTER),
urwid.Text(
"The identity of this peer is not known, and you cannot currently send messages to it. "
"You can query the network to obtain the identity.\n",
align=urwid.CENTER,
),
query_button,
urwid.Divider(g["divider1"]),
])
dialog_pile = urwid.Pile([
selected_id_widget,
e_name,
urwid.Divider(g["divider1"]),
r_untrusted,
r_unknown,
r_trusted,
urwid.Divider(g["divider1"]),
r_direct,
r_propagated,
known_section,
urwid.Columns([
(urwid.WEIGHT, 0.45, urwid.Button("Save", on_press=confirmed)),
(urwid.WEIGHT, 0.1, urwid.Text("")),
(urwid.WEIGHT, 0.45, urwid.Button("Back", on_press=dismiss_dialog)),
])
])
dialog_pile.error_display = False
dialog = DialogLineBox(dialog_pile, title="Peer Info")
dialog.delegate = self
bottom = self.listbox
overlay = urwid.Overlay(
dialog,
bottom,
align=urwid.CENTER,
width=urwid.RELATIVE_100,
valign=urwid.MIDDLE,
height=urwid.PACK,
left=2,
right=2,
)
# options = self.columns_widget.options(urwid.WEIGHT, ConversationsDisplay.list_width)
options = self.columns_widget.options(urwid.GIVEN, ConversationsDisplay.given_list_width)
self.columns_widget.contents[0] = (overlay, options)
def new_conversation(self):
self.dialog_open = True
source_hash = ""
display_name = ""
e_id = urwid.Edit(caption="Addr : ",edit_text=source_hash)
e_name = urwid.Edit(caption="Name : ",edit_text=display_name)
trust_button_group = []
r_untrusted = urwid.RadioButton(trust_button_group, "Untrusted")
r_unknown = urwid.RadioButton(trust_button_group, "Unknown", state=True)
r_trusted = urwid.RadioButton(trust_button_group, "Trusted")
def dismiss_dialog(sender):
self.update_conversation_list()
self.dialog_open = False
def confirmed(sender):
try:
existing_conversations = nomadnet.Conversation.conversation_list(self.app)
display_name = e_name.get_edit_text()
source_hash_text = e_id.get_edit_text().strip()
source_hash = bytes.fromhex(source_hash_text)
trust_level = DirectoryEntry.UNTRUSTED
if r_unknown.state == True:
trust_level = DirectoryEntry.UNKNOWN
elif r_trusted.state == True:
trust_level = DirectoryEntry.TRUSTED
if not source_hash in [c[0] for c in existing_conversations]:
entry = DirectoryEntry(source_hash, display_name, trust_level)
self.app.directory.remember(entry)
new_conversation = nomadnet.Conversation(source_hash_text, nomadnet.NomadNetworkApp.get_shared_instance(), initiator=True)
self.update_conversation_list()
self.display_conversation(source_hash_text)
self.dialog_open = False
except Exception as e:
RNS.log("Could not start conversation. The contained exception was: "+str(e), RNS.LOG_VERBOSE)
if not dialog_pile.error_display:
dialog_pile.error_display = True
options = dialog_pile.options(height_type=urwid.PACK)
dialog_pile.contents.append((urwid.Text(""), options))
dialog_pile.contents.append((
urwid.Text(
("error_text", "Could not start conversation. Check your input."),
align=urwid.CENTER,
),
options,
))
dialog_pile = urwid.Pile([
e_id,
e_name,
urwid.Text(""),
r_untrusted,
r_unknown,
r_trusted,
urwid.Text(""),
urwid.Columns([
(urwid.WEIGHT, 0.45, urwid.Button("Create", on_press=confirmed)),
(urwid.WEIGHT, 0.1, urwid.Text("")),
(urwid.WEIGHT, 0.45, urwid.Button("Back", on_press=dismiss_dialog)),
])
])
dialog_pile.error_display = False
dialog = DialogLineBox(dialog_pile, title="New Conversation")
dialog.delegate = self
bottom = self.listbox
overlay = urwid.Overlay(
dialog,
bottom,
align=urwid.CENTER,
width=urwid.RELATIVE_100,
valign=urwid.MIDDLE,
height=urwid.PACK,
left=2,
right=2,
)
# options = self.columns_widget.options(urwid.WEIGHT, ConversationsDisplay.list_width)
options = self.columns_widget.options(urwid.GIVEN, ConversationsDisplay.given_list_width)
self.columns_widget.contents[0] = (overlay, options)
def ingest_lxm_uri(self):
self.dialog_open = True
lxm_uri = ""
e_uri = urwid.Edit(caption="URI : ",edit_text=lxm_uri)
def dismiss_dialog(sender):
self.update_conversation_list()
self.dialog_open = False
def confirmed(sender):
try:
local_delivery_signal = "local_delivery_occurred"
duplicate_signal = "duplicate_lxm"
lxm_uri = e_uri.get_edit_text().strip()
ingest_result = self.app.message_router.ingest_lxm_uri(
lxm_uri,
signal_local_delivery=local_delivery_signal,
signal_duplicate=duplicate_signal
)
if ingest_result == False:
raise ValueError("The URI contained no decodable messages")
elif ingest_result == local_delivery_signal:
rdialog_pile = urwid.Pile([
urwid.Text("Message was decoded, decrypted successfully, and added to your conversation list."),
urwid.Text(""),
urwid.Columns([
(urwid.WEIGHT, 0.6, urwid.Text("")),
(urwid.WEIGHT, 0.4, urwid.Button("OK", on_press=dismiss_dialog)),
])
])
rdialog_pile.error_display = False
rdialog = DialogLineBox(rdialog_pile, title="Ingest message URI")
rdialog.delegate = self
bottom = self.listbox
roverlay = urwid.Overlay(
rdialog,
bottom,
align=urwid.CENTER,
width=urwid.RELATIVE_100,
valign=urwid.MIDDLE,
height=urwid.PACK,
left=2,
right=2,
)
options = self.columns_widget.options(urwid.GIVEN, ConversationsDisplay.given_list_width)
self.columns_widget.contents[0] = (roverlay, options)
elif ingest_result == duplicate_signal:
rdialog_pile = urwid.Pile([
urwid.Text("The decoded message has already been processed by the LXMF Router, and will not be ingested again."),
urwid.Text(""),
urwid.Columns([
(urwid.WEIGHT, 0.6, urwid.Text("")),
(urwid.WEIGHT, 0.4, urwid.Button("OK", on_press=dismiss_dialog)),
])
])
rdialog_pile.error_display = False
rdialog = DialogLineBox(rdialog_pile, title="Ingest message URI")
rdialog.delegate = self
bottom = self.listbox
roverlay = urwid.Overlay(
rdialog,
bottom,
align=urwid.CENTER,
width=urwid.RELATIVE_100,
valign=urwid.MIDDLE,
height=urwid.PACK,
left=2,
right=2,
)
options = self.columns_widget.options(urwid.GIVEN, ConversationsDisplay.given_list_width)
self.columns_widget.contents[0] = (roverlay, options)
else:
if self.app.enable_node:
propagation_text = "The decoded message was not addressed to this LXMF address, but has been added to the propagation node queues, and will be distributed on the propagation network."
else:
propagation_text = "The decoded message was not addressed to this LXMF address, and has been discarded."
rdialog_pile = urwid.Pile([
urwid.Text(propagation_text),
urwid.Text(""),
urwid.Columns([
(urwid.WEIGHT, 0.6, urwid.Text("")),
(urwid.WEIGHT, 0.4, urwid.Button("OK", on_press=dismiss_dialog)),
])
])
rdialog_pile.error_display = False
rdialog = DialogLineBox(rdialog_pile, title="Ingest message URI")
rdialog.delegate = self
bottom = self.listbox
roverlay = urwid.Overlay(
rdialog,
bottom,
align=urwid.CENTER,
width=urwid.RELATIVE_100,
valign=urwid.MIDDLE,
height=urwid.PACK,
left=2,
right=2,
)
options = self.columns_widget.options(urwid.GIVEN, ConversationsDisplay.given_list_width)
self.columns_widget.contents[0] = (roverlay, options)
except Exception as e:
RNS.log("Could not ingest LXM URI. The contained exception was: "+str(e), RNS.LOG_VERBOSE)
if not dialog_pile.error_display:
dialog_pile.error_display = True
options = dialog_pile.options(height_type=urwid.PACK)
dialog_pile.contents.append((urwid.Text(""), options))
dialog_pile.contents.append((urwid.Text(("error_text", "Could ingest LXM from URI data. Check your input."), align=urwid.CENTER), options))
dialog_pile = urwid.Pile([
e_uri,
urwid.Text(""),
urwid.Columns([
(urwid.WEIGHT, 0.45, urwid.Button("Ingest", on_press=confirmed)),
(urwid.WEIGHT, 0.1, urwid.Text("")),
(urwid.WEIGHT, 0.45, urwid.Button("Back", on_press=dismiss_dialog)),
])
])
dialog_pile.error_display = False
dialog = DialogLineBox(dialog_pile, title="Ingest message URI")
dialog.delegate = self
bottom = self.listbox
overlay = urwid.Overlay(
dialog,
bottom,
align=urwid.CENTER,
width=urwid.RELATIVE_100,
valign=urwid.MIDDLE,
height=urwid.PACK,
left=2,
right=2,
)
options = self.columns_widget.options(urwid.GIVEN, ConversationsDisplay.given_list_width)
self.columns_widget.contents[0] = (overlay, options)
def delete_conversation(self, source_hash):
if source_hash in ConversationsDisplay.cached_conversation_widgets:
conversation = ConversationsDisplay.cached_conversation_widgets[source_hash]
self.close_conversation(conversation)
def toggle_fullscreen(self):
if ConversationsDisplay.given_list_width != 0:
self.saved_list_width = ConversationsDisplay.given_list_width
ConversationsDisplay.given_list_width = 0
else:
ConversationsDisplay.given_list_width = self.saved_list_width
self.update_conversation_list()
def sync_conversations(self):
g = self.app.ui.glyphs
self.dialog_open = True
def dismiss_dialog(sender):
self.dialog_open = False
self.sync_dialog = None
self.update_conversation_list()
if self.app.message_router.propagation_transfer_state >= LXMF.LXMRouter.PR_COMPLETE:
self.app.cancel_lxmf_sync()
max_messages_group = []
r_mall = urwid.RadioButton(max_messages_group, "Download all", state=True)
r_mlim = urwid.RadioButton(max_messages_group, "Limit to", state=False)
ie_lim = urwid.IntEdit("", 5)
rbs = urwid.GridFlow([r_mlim, ie_lim], 12, 1, 0, align=urwid.LEFT)
def sync_now(sender):
limit = None
if r_mlim.get_state():
limit = ie_lim.value()
self.app.request_lxmf_sync(limit)
self.update_sync_dialog()
def cancel_sync(sender):
self.app.cancel_lxmf_sync()
self.update_sync_dialog()
cancel_button = urwid.Button("Close", on_press=dismiss_dialog)
sync_progress = SyncProgressBar("progress_empty" , "progress_full", current=self.app.get_sync_progress(), done=1.0, satt=None)
real_sync_button = urwid.Button("Sync Now", on_press=sync_now)
hidden_sync_button = urwid.Button("Cancel Sync", on_press=cancel_sync)
if self.app.get_sync_status() == "Idle" or self.app.message_router.propagation_transfer_state >= LXMF.LXMRouter.PR_COMPLETE:
sync_button = real_sync_button
else:
sync_button = hidden_sync_button
button_columns = urwid.Columns([
(urwid.WEIGHT, 0.45, sync_button),
(urwid.WEIGHT, 0.1, urwid.Text("")),
(urwid.WEIGHT, 0.45, cancel_button),
])
real_sync_button.bc = button_columns
pn_ident = None
if self.app.get_default_propagation_node() != None:
pn_hash = self.app.get_default_propagation_node()
pn_ident = RNS.Identity.recall(pn_hash)
if pn_ident == None:
RNS.log("Propagation node identity is unknown, requesting from network...", RNS.LOG_DEBUG)
RNS.Transport.request_path(pn_hash)
if pn_ident != None:
node_hash = RNS.Destination.hash_from_name_and_identity("nomadnetwork.node", pn_ident)
pn_entry = self.app.directory.find(node_hash)
pn_display_str = " "
if pn_entry != None:
pn_display_str += " "+str(pn_entry.display_name)
else:
pn_display_str += " "+RNS.prettyhexrep(pn_hash)
dialog = DialogLineBox(
urwid.Pile([
urwid.Text(""+g["node"]+pn_display_str, align=urwid.CENTER),
urwid.Divider(g["divider1"]),
sync_progress,
urwid.Divider(g["divider1"]),
r_mall,
rbs,
urwid.Text(""),
button_columns
]), title="Message Sync"
)
else:
button_columns = urwid.Columns([
(urwid.WEIGHT, 0.45, urwid.Text("" )),
(urwid.WEIGHT, 0.1, urwid.Text("")),
(urwid.WEIGHT, 0.45, cancel_button),
])
dialog = DialogLineBox(
urwid.Pile([
urwid.Text(""),
urwid.Text("No trusted nodes found, cannot sync!\n", align=urwid.CENTER),
urwid.Text(
"To synchronise messages from the network, "
"one or more nodes must be marked as trusted in the Known Nodes list, "
"or a node must manually be selected as the default propagation node. "
"Nomad Network will then automatically sync from the nearest trusted node, "
"or the manually selected one.",
align=urwid.LEFT,
),
urwid.Text(""),
button_columns
]), title="Message Sync"
)
dialog.delegate = self
dialog.sync_progress = sync_progress
dialog.cancel_button = cancel_button
dialog.real_sync_button = real_sync_button
dialog.hidden_sync_button = hidden_sync_button
dialog.bc = button_columns
self.sync_dialog = dialog
bottom = self.listbox
overlay = urwid.Overlay(
dialog,
bottom,
align=urwid.CENTER,
width=urwid.RELATIVE_100,
valign=urwid.MIDDLE,
height=urwid.PACK,
left=2,
right=2,
)
# options = self.columns_widget.options(urwid.WEIGHT, ConversationsDisplay.list_width)
options = self.columns_widget.options(urwid.GIVEN, ConversationsDisplay.given_list_width)
self.columns_widget.contents[0] = (overlay, options)
def update_sync_dialog(self, loop = None, sender = None):
if self.dialog_open and self.sync_dialog != None:
self.sync_dialog.sync_progress.set_completion(self.app.get_sync_progress())
if self.app.get_sync_status() == "Idle" or self.app.message_router.propagation_transfer_state >= LXMF.LXMRouter.PR_COMPLETE:
self.sync_dialog.bc.contents[0] = (self.sync_dialog.real_sync_button, self.sync_dialog.bc.options(urwid.WEIGHT, 0.45))
else:
self.sync_dialog.bc.contents[0] = (self.sync_dialog.hidden_sync_button, self.sync_dialog.bc.options(urwid.WEIGHT, 0.45))
self.app.ui.loop.set_alarm_in(0.2, self.update_sync_dialog)
def conversation_list_selection(self, arg1, arg2):
pass
def update_conversation_list(self):
selected_hash = None
selected_item = self.ilb.get_selected_item()
if selected_item is not None:
if hasattr(selected_item, "source_hash"):
selected_hash = selected_item.source_hash
self.update_listbox()
options = self.columns_widget.options(urwid.GIVEN, ConversationsDisplay.given_list_width)
if not (self.dialog_open and self.sync_dialog != None):
self.columns_widget.contents[0] = (self.listbox, options)
else:
bottom = self.listbox
overlay = urwid.Overlay(
self.sync_dialog,
bottom,
align=urwid.CENTER,
width=urwid.RELATIVE_100,
valign=urwid.MIDDLE,
height=urwid.PACK,
left=2,
right=2,
)
self.columns_widget.contents[0] = (overlay, options)
if selected_hash is not None:
for idx, widget in enumerate(self.list_widgets):
if widget.source_hash == selected_hash:
self.ilb.select_item(idx)
break
nomadnet.NomadNetworkApp.get_shared_instance().ui.loop.draw_screen()
if self.app.ui.main_display.sub_displays.active_display == self.app.ui.main_display.sub_displays.conversations_display:
if self.currently_displayed_conversation != None:
if self.app.conversation_is_unread(self.currently_displayed_conversation):
self.app.mark_conversation_read(self.currently_displayed_conversation)
try:
if os.path.isfile(self.app.conversationpath + "/" + self.currently_displayed_conversation + "/unread"):
os.unlink(self.app.conversationpath + "/" + self.currently_displayed_conversation + "/unread")
except Exception as e:
raise e
def display_conversation(self, sender=None, source_hash=None):
if self.currently_displayed_conversation != None:
if self.app.conversation_is_unread(self.currently_displayed_conversation):
self.app.mark_conversation_read(self.currently_displayed_conversation)
self.currently_displayed_conversation = source_hash
# options = self.widget.options(urwid.WEIGHT, 1-ConversationsDisplay.list_width)
options = self.widget.options(urwid.WEIGHT, 1)
self.widget.contents[1] = (self.make_conversation_widget(source_hash), options)
if source_hash == None:
self.widget.focus_position = 0
else:
if self.app.conversation_is_unread(source_hash):
self.app.mark_conversation_read(source_hash)
self.update_conversation_list()
self.widget.focus_position = 1
conversation_position = None
index = 0
for widget in self.list_widgets:
if widget.source_hash == source_hash:
conversation_position = index
index += 1
if conversation_position != None:
self.ilb.select_item(conversation_position)
def make_conversation_widget(self, source_hash):
if source_hash in ConversationsDisplay.cached_conversation_widgets:
conversation_widget = ConversationsDisplay.cached_conversation_widgets[source_hash]
if source_hash != None:
conversation_widget.update_message_widgets(replace=True)
conversation_widget.check_editor_allowed()
return conversation_widget
else:
widget = ConversationWidget(source_hash)
widget.delegate = self
ConversationsDisplay.cached_conversation_widgets[source_hash] = widget
widget.check_editor_allowed()
return widget
def close_conversation_by_hash(self, conversation_hash):
if conversation_hash in ConversationsDisplay.cached_conversation_widgets:
ConversationsDisplay.cached_conversation_widgets.pop(conversation_hash)
if self.currently_displayed_conversation == conversation_hash:
self.display_conversation(sender=None, source_hash=None)
def close_conversation(self, conversation):
if conversation.source_hash in ConversationsDisplay.cached_conversation_widgets:
ConversationsDisplay.cached_conversation_widgets.pop(conversation.source_hash)
if self.currently_displayed_conversation == conversation.source_hash:
self.display_conversation(sender=None, source_hash=None)
def conversation_list_widget(self, conversation):
trust_level = conversation[2]
display_name = conversation[1]
source_hash = conversation[0]
unread = conversation[4]
last_activity = conversation[5]
g = self.app.ui.glyphs
if trust_level == DirectoryEntry.UNTRUSTED:
symbol = g["cross"]
style = "list_untrusted"
focus_style = "list_focus_untrusted"
elif trust_level == DirectoryEntry.UNKNOWN:
symbol = "?"
style = "list_unknown"
focus_style = "list_focus"
elif trust_level == DirectoryEntry.TRUSTED:
symbol = g["check"]
style = "list_trusted"
focus_style = "list_focus_trusted"
elif trust_level == DirectoryEntry.WARNING:
symbol = g["warning"]
style = "list_warning"
focus_style = "list_focus"
else:
symbol = g["warning"]
style = "list_untrusted"
focus_style = "list_focus_untrusted"
display_text = symbol
if display_name != None and display_name != "":
display_text += " "+display_name
if trust_level != DirectoryEntry.TRUSTED:
display_text += " <"+source_hash+">"
if trust_level != DirectoryEntry.UNTRUSTED:
if unread:
if source_hash != self.currently_displayed_conversation:
if unread > 1:
display_text += " "+g["unread"]+" ("+str(unread)+")"
else:
display_text += " "+g["unread"]
if last_activity > 0:
display_text += "\n "+relative_time(last_activity)
widget = ListEntry(display_text)
urwid.connect_signal(widget, "click", self.display_conversation, conversation[0])
display_widget = urwid.AttrMap(widget, style, focus_style)
display_widget.source_hash = source_hash
display_widget.display_name = display_name
return display_widget
def shortcuts(self):
focus_path = self.widget.get_focus_path()
if focus_path[0] == 0:
return self.list_shortcuts
elif focus_path[0] == 1:
return self.editor_shortcuts
else:
return self.list_shortcuts
class ListEntry(urwid.Text):
_selectable = True
signals = ["click"]
def keypress(self, size, key):
"""
Send 'click' signal on 'activate' command.
"""
if self._command_map[key] != urwid.ACTIVATE:
return key
self._emit('click')
def mouse_event(self, size, event, button, x, y, focus):
"""
Send 'click' signal on button 1 press.
"""
if button != 1 or not urwid.util.is_mouse_press(event):
return False
self._emit('click')
return True
class MessageEdit(urwid.Edit):
def keypress(self, size, key):
if key == "ctrl d":
self.delegate.send_message()
elif key == "ctrl p":
self.delegate.paper_message()
elif key == "ctrl a":
self.delegate.attach_file()
elif key == "ctrl s":
self.delegate.save_focused_attachments()
elif key == "ctrl k":
self.delegate.clear_editor()
elif key == "up":
y = self.get_cursor_coords(size)[1]
if y == 0:
if self.delegate.full_editor_active and self.name == "title_editor":
self.delegate.frame.focus_position = "body"
elif not self.delegate.full_editor_active and self.name == "content_editor":
self.delegate.frame.focus_position = "body"
else:
return super(MessageEdit, self).keypress(size, key)
else:
return super(MessageEdit, self).keypress(size, key)
else:
return super(MessageEdit, self).keypress(size, key)
class ConversationFrame(urwid.Frame):
def keypress(self, size, key):
if self.focus_position == "body":
if getattr(self.delegate, "dialog_active", False) or getattr(self.delegate, "dialog_open", False):
return super(ConversationFrame, self).keypress(size, key)
elif key == "up" and self.delegate.messagelist.top_is_visible:
nomadnet.NomadNetworkApp.get_shared_instance().ui.main_display.frame.focus_position = "header"
elif key == "down" and self.delegate.messagelist.bottom_is_visible:
self.focus_position = "footer"
else:
return super(ConversationFrame, self).keypress(size, key)
elif key == "ctrl k":
self.delegate.clear_editor()
else:
return super(ConversationFrame, self).keypress(size, key)
class ConversationWidget(urwid.WidgetWrap):
def __init__(self, source_hash):
self.app = nomadnet.NomadNetworkApp.get_shared_instance()
g = self.app.ui.glyphs
if source_hash == None:
self.frame = None
display_widget = urwid.LineBox(urwid.Filler(urwid.Text("\n No conversation selected"), "top"))
super().__init__(display_widget)
else:
if source_hash in ConversationsDisplay.cached_conversation_widgets:
return ConversationsDisplay.cached_conversation_widgets[source_hash]
else:
self.source_hash = source_hash
self.conversation = nomadnet.Conversation(source_hash, nomadnet.NomadNetworkApp.get_shared_instance())
self.message_widgets = []
self.sort_by_timestamp = False
self.updating_message_widgets = False
self.pending_attachments = []
self.dialog_active = False
self.update_message_widgets()
self.conversation.register_changed_callback(self.conversation_changed)
#title_editor = MessageEdit(caption="\u270E", edit_text="", multiline=False)
title_editor = MessageEdit(caption="", edit_text="", multiline=False)
title_editor.delegate = self
title_editor.name = "title_editor"
#msg_editor = MessageEdit(caption="\u270E", edit_text="", multiline=True)
msg_editor = MessageEdit(caption="", edit_text="", multiline=True)
msg_editor.delegate = self
msg_editor.name = "content_editor"
self.peer_info_widget = urwid.AttrMap(urwid.Text(""), "msg_header_sent")
self._update_peer_info()
header_widgets = [self.peer_info_widget]
if self.conversation.trust_level == DirectoryEntry.UNTRUSTED:
header_widgets.append(urwid.AttrMap(
urwid.Padding(
urwid.Text(g["warning"]+" Warning: Conversation with untrusted peer "+g["warning"], align=urwid.CENTER)),
"msg_warning_untrusted",
))
header = urwid.Pile(header_widgets)
self.minimal_editor = urwid.AttrMap(msg_editor, "msg_editor")
self.minimal_editor.name = "minimal_editor"
title_columns = urwid.Columns([
(8, urwid.Text("Title")),
urwid.AttrMap(title_editor, "msg_editor"),
])
content_columns = urwid.Columns([
(8, urwid.Text("Content")),
urwid.AttrMap(msg_editor, "msg_editor")
])
self.full_editor = urwid.Pile([
title_columns,
content_columns
])
self.full_editor.name = "full_editor"
self.content_editor = msg_editor
self.title_editor = title_editor
self.full_editor_active = False
self.frame = ConversationFrame(
self.messagelist,
header=header,
footer=self.minimal_editor,
focus_part="footer"
)
self.frame.delegate = self
self.display_widget = urwid.LineBox(
self.frame
)
super().__init__(self.display_widget)
def _update_peer_info(self):
def san(name):
if self.app.config["textui"]["sanitize_names"]: return sanitize_name(name)
else: return strip_modifiers(name)
g = self.app.ui.glyphs
source_hash_bytes = bytes.fromhex(self.source_hash)
display_name = self.app.directory.display_name(source_hash_bytes)
app_data = None
if display_name is None or self.app.message_router.get_outbound_stamp_cost(source_hash_bytes) is None:
app_data = RNS.Identity.recall_app_data(source_hash_bytes)
if display_name is None:
if app_data:
display_name = san(LXMF.display_name_from_app_data(app_data))
if display_name is None:
display_name = RNS.prettyhexrep(source_hash_bytes)
stamp_cost = self.app.message_router.get_outbound_stamp_cost(source_hash_bytes)
if stamp_cost is None and app_data:
stamp_cost = LXMF.stamp_cost_from_app_data(app_data)
hops = RNS.Transport.hops_to(source_hash_bytes)
if hops >= RNS.Transport.PATHFINDER_M:
hops_str = "unknown"
else:
hops_str = str(hops)+" hop" + ("s" if hops != 1 else "")
right_parts = []
if stamp_cost is not None:
right_parts.append("Stamp: "+str(stamp_cost))
right_parts.append(g["speed"]+hops_str)
left = " "+display_name
right = " ".join(right_parts)+" "
self.peer_info_widget.original_widget.set_text(left+" | "+right)
def clear_history_dialog(self):
def dismiss_dialog(sender):
self.dialog_open = False
self.conversation_changed(None)
def confirmed(sender):
self.dialog_open = False
self.conversation.clear_history()
self.conversation_changed(None)
dialog = DialogLineBox(
urwid.Pile([
urwid.Text("Clear conversation history\n", align=urwid.CENTER),
urwid.Columns([
(urwid.WEIGHT, 0.45, urwid.Button("Yes", on_press=confirmed)),
(urwid.WEIGHT, 0.1, urwid.Text("")),
(urwid.WEIGHT, 0.45, urwid.Button("No", on_press=dismiss_dialog)),
])
]), title="?"
)
dialog.delegate = self
bottom = self.messagelist
overlay = urwid.Overlay(
dialog,
bottom,
align=urwid.CENTER,
width=34,
valign=urwid.MIDDLE,
height=urwid.PACK,
left=2,
right=2,
)
self.frame.contents["body"] = (overlay, self.frame.options())
self.frame.focus_position = "body"
def _build_footer(self):
g = self.app.ui.glyphs
if self.full_editor_active:
editor = self.full_editor
else:
editor = self.minimal_editor
if self.pending_attachments:
attachment_texts = []
for path in self.pending_attachments:
attachment_texts.append(os.path.basename(path))
indicator = urwid.AttrMap(
urwid.Text(g["file"]+" "+str(len(self.pending_attachments))+" file(s): "+", ".join(attachment_texts)),
"msg_header_sent",
)
return urwid.Pile([indicator, editor])
else:
return editor
def toggle_editor(self):
if self.full_editor_active:
self.full_editor_active = False
else:
self.full_editor_active = True
self.frame.contents["footer"] = (self._build_footer(), None)
def check_editor_allowed(self):
g = self.app.ui.glyphs
if self.frame:
allowed = nomadnet.NomadNetworkApp.get_shared_instance().directory.is_known(bytes.fromhex(self.source_hash))
if allowed:
self.frame.contents["footer"] = (self._build_footer(), None)
else:
warning = urwid.AttrMap(
urwid.Padding(urwid.Text(
"\n"+g["info"]+"\n\nYou cannot currently message this peer, since its identity keys are not known. "
"The keys have been requested from the network and should arrive shortly, if available. "
"Close this conversation and reopen it to try again.\n\n"
"To query the network manually, select this conversation in the conversation list, "
"press Ctrl-E, and use the query button.\n",
align=urwid.CENTER,
)),
"msg_header_caution",
)
self.frame.contents["footer"] = (warning, None)
def toggle_focus_area(self):
name = ""
try:
name = self.frame.get_focus_widgets()[0].name
except Exception as e:
pass
if name == "messagelist":
self.frame.focus_position = "footer"
elif name == "minimal_editor" or name == "full_editor":
self.frame.focus_position = "body"
def keypress(self, size, key):
if key == "tab":
self.toggle_focus_area()
elif key == "ctrl w":
self.close()
elif key == "ctrl u":
self.conversation.purge_failed()
self.conversation_changed(None)
elif key == "ctrl t":
self.toggle_editor()
elif key == "ctrl x":
self.clear_history_dialog()
elif key == "ctrl g":
nomadnet.NomadNetworkApp.get_shared_instance().ui.main_display.sub_displays.conversations_display.toggle_fullscreen()
elif key == "ctrl o":
self.sort_by_timestamp ^= True
self.conversation_changed(None)
elif key == "ctrl a":
self.attach_file()
elif key == "ctrl s":
self.save_focused_attachments()
else:
return super(ConversationWidget, self).keypress(size, key)
def conversation_changed(self, conversation):
if hasattr(self, "peer_info_widget"):
self._update_peer_info()
self.update_message_widgets(replace = True)
def update_message_widgets(self, replace = False):
while self.updating_message_widgets:
time.sleep(0.5)
self.updating_message_widgets = True
self.message_widgets = []
added_hashes = []
needs_index = []
for message in self.conversation.messages:
message_hash = message.get_hash()
if not message_hash in added_hashes:
added_hashes.append(message_hash)
was_loaded = message.loaded
message_widget = LXMessageWidget(message)
self.message_widgets.append(message_widget)
if not was_loaded and message.loaded:
needs_index.append(message)
message.unload()
if needs_index:
try:
ConversationMessage.write_index(
self.conversation.messages_path, needs_index)
except Exception:
pass
if self.sort_by_timestamp:
self.message_widgets.sort(key=lambda m: m.timestamp, reverse=False)
else:
self.message_widgets.sort(key=lambda m: m.sort_timestamp, reverse=False)
from nomadnet.vendor.additional_urwid_widgets import IndicativeListBox
self.messagelist = IndicativeListBox(self.message_widgets, position = len(self.message_widgets)-1)
self.messagelist.name = "messagelist"
if replace:
self.frame.contents["body"] = (self.messagelist, None)
nomadnet.NomadNetworkApp.get_shared_instance().ui.loop.draw_screen()
self.updating_message_widgets = False
def clear_editor(self):
self.content_editor.set_edit_text("")
self.title_editor.set_edit_text("")
self.pending_attachments = []
self.frame.contents["footer"] = (self._build_footer(), None)
def _collect_attachment_refs(self):
g = self.app.ui.glyphs
refs = []
sorted_messages = sorted(self.conversation.messages, key=lambda m: m.sort_timestamp, reverse=True)
for conv_message in sorted_messages:
if not conv_message.has_attachments():
continue
cached_names = conv_message._cached_attachment_names or []
att_file_idx = 0
for atype, aname, *arest in cached_names:
asize = arest[0] if arest else 0
glyph = g["file"] if atype == "file" else g[atype]
label = glyph+" "+aname
if asize > 0:
label += " ("+_format_size(asize)+")"
if atype == "file":
refs.append((label, aname, conv_message, "file", att_file_idx))
att_file_idx += 1
else:
refs.append((label, aname, conv_message, atype, 0))
return refs
def save_focused_attachments(self):
g = self.app.ui.glyphs
self.dialog_active = True
try:
attachment_items = self._collect_attachment_refs()
except Exception as e:
RNS.log("Error collecting attachments: "+str(e), RNS.LOG_ERROR)
attachment_items = []
save_dir = self.app.attachment_save_path if self.app.attachment_save_path else self.app.downloads_path
def dismiss_dialog(sender):
self.dialog_active = False
self.conversation_changed(None)
if not attachment_items:
dialog = DialogLineBox(
urwid.Pile([
urwid.Text("No attachments in this conversation.\n"),
urwid.Columns([
(urwid.WEIGHT, 0.6, urwid.Text("")),
(urwid.WEIGHT, 0.4, urwid.Button("OK", on_press=dismiss_dialog)),
])
]), title="Attachments"
)
dialog.delegate = self
bottom = self.messagelist
overlay = urwid.Overlay(dialog, bottom, align=urwid.CENTER, width=45, valign=urwid.MIDDLE, height=urwid.PACK, left=2, right=2)
self.frame.contents["body"] = (overlay, self.frame.options())
self.frame.focus_position = "body"
return
checkboxes = []
for label, filename, conv_msg, field_type, field_index in attachment_items:
cb = urwid.CheckBox(label, state=False)
cb._attachment_filename = filename
cb._conv_message = conv_msg
cb._field_type = field_type
cb._field_index = field_index
checkboxes.append(cb)
status_text = urwid.Text("")
def do_save(sender):
saved = []
errors = []
for cb in checkboxes:
if cb.get_state():
try:
src_path = cb._conv_message.get_attachment_file_path(cb._field_type, cb._field_index)
if src_path and os.path.isfile(src_path):
path = _copy_attachment_to_dest(cb._attachment_filename, src_path)
saved.append(path)
except Exception as e:
errors.append(str(e))
if saved:
lines = [g["check"]+" Copied "+str(len(saved))+" file(s) to "+save_dir+":"]
for p in saved:
lines.append(" "+os.path.basename(p))
if errors:
lines.append(g["cross"]+" "+str(len(errors))+" failed")
status_text.set_text("\n".join(lines))
elif errors:
status_text.set_text(g["cross"]+" Failed: "+errors[0])
else:
status_text.set_text("No files selected")
dialog_widgets = list(checkboxes)
dialog_widgets.append(urwid.Divider(g["divider1"]))
dialog_widgets.append(urwid.Text("Copy to: "+save_dir))
dialog_widgets.append(status_text)
dialog_widgets.append(urwid.Text(""))
dialog_widgets.append(urwid.Columns([
(urwid.WEIGHT, 0.45, urwid.Button("Copy to Downloads", on_press=do_save)),
(urwid.WEIGHT, 0.1, urwid.Text("")),
(urwid.WEIGHT, 0.45, urwid.Button("Close", on_press=dismiss_dialog)),
]))
dialog = DialogLineBox(urwid.ListBox(urwid.SimpleFocusListWalker(dialog_widgets)), title="Attachments")
dialog.delegate = self
bottom = self.messagelist
overlay = urwid.Overlay(dialog, bottom, align=urwid.CENTER, width=("relative", 80), valign=urwid.MIDDLE, height=("relative", 80), left=2, right=2)
self.frame.contents["body"] = (overlay, self.frame.options())
self.frame.focus_position = "body"
def send_message(self):
content = self.content_editor.get_edit_text()
title = self.title_editor.get_edit_text()
if not content == "":
fields = None
if self.pending_attachments:
file_attachments = []
for file_path in self.pending_attachments:
try:
with open(file_path, "rb") as af:
file_data = af.read()
file_name = os.path.basename(file_path)
file_attachments.append([file_name, file_data])
except Exception as e:
RNS.log("Error reading attachment "+str(file_path)+": "+str(e), RNS.LOG_ERROR)
if file_attachments:
fields = {LXMF.FIELD_FILE_ATTACHMENTS: file_attachments}
if self.conversation.send(content, title, fields=fields):
self.clear_editor()
def attach_file(self):
self.dialog_active = True
browser = FileBrowserDialog(self)
bottom = self.messagelist
overlay = urwid.Overlay(browser, bottom, align=urwid.CENTER, width=("relative", 90), valign=urwid.MIDDLE, height=("relative", 80), left=2, right=2)
self.frame.contents["body"] = (overlay, self.frame.options())
self.frame.focus_position = "body"
def file_browser_closed(self):
self.dialog_active = False
self.frame.contents["footer"] = (self._build_footer(), None)
self.conversation_changed(None)
def paper_message_saved(self, path):
g = self.app.ui.glyphs
def dismiss_dialog(sender):
self.dialog_open = False
self.conversation_changed(None)
dialog = DialogLineBox(
urwid.Pile([
urwid.Text("The paper message was saved to:\n\n"+str(path)+"\n", align=urwid.CENTER),
urwid.Columns([
(urwid.WEIGHT, 0.6, urwid.Text("")),
(urwid.WEIGHT, 0.4, urwid.Button("OK", on_press=dismiss_dialog)),
])
]), title=g["papermsg"].replace(" ", "")
)
dialog.delegate = self
bottom = self.messagelist
overlay = urwid.Overlay(dialog, bottom, align=urwid.CENTER, width=60, valign=urwid.MIDDLE, height=urwid.PACK, left=2, right=2)
self.frame.contents["body"] = (overlay, self.frame.options())
self.frame.focus_position = "body"
def print_paper_message_qr(self):
content = self.content_editor.get_edit_text()
title = self.title_editor.get_edit_text()
if not content == "":
if self.conversation.paper_output(content, title):
self.clear_editor()
else:
self.paper_message_failed()
def save_paper_message_qr(self):
content = self.content_editor.get_edit_text()
title = self.title_editor.get_edit_text()
if not content == "":
output_result = self.conversation.paper_output(content, title, mode="save_qr")
if output_result != False:
self.clear_editor()
self.paper_message_saved(output_result)
else:
self.paper_message_failed()
def save_paper_message_uri(self):
content = self.content_editor.get_edit_text()
title = self.title_editor.get_edit_text()
if not content == "":
output_result = self.conversation.paper_output(content, title, mode="save_uri")
if output_result != False:
self.clear_editor()
self.paper_message_saved(output_result)
else:
self.paper_message_failed()
def paper_message(self):
def dismiss_dialog(sender):
self.dialog_open = False
self.conversation_changed(None)
def print_qr(sender):
dismiss_dialog(self)
self.print_paper_message_qr()
def save_qr(sender):
dismiss_dialog(self)
self.save_paper_message_qr()
def save_uri(sender):
dismiss_dialog(self)
self.save_paper_message_uri()
dialog = DialogLineBox(
urwid.Pile([
urwid.Text(
"Select the desired paper message output method.\nSaved files will be written to:\n\n"+str(self.app.downloads_path)+"\n",
align=urwid.CENTER,
),
urwid.Columns([
(urwid.WEIGHT, 0.5, urwid.Button("Print QR", on_press=print_qr)),
(urwid.WEIGHT, 0.1, urwid.Text("")),
(urwid.WEIGHT, 0.5, urwid.Button("Save QR", on_press=save_qr)),
(urwid.WEIGHT, 0.1, urwid.Text("")),
(urwid.WEIGHT, 0.5, urwid.Button("Save URI", on_press=save_uri)),
(urwid.WEIGHT, 0.1, urwid.Text("")),
(urwid.WEIGHT, 0.5, urwid.Button("Cancel", on_press=dismiss_dialog))
])
]), title="Create Paper Message"
)
dialog.delegate = self
bottom = self.messagelist
overlay = urwid.Overlay(dialog, bottom, align=urwid.CENTER, width=60, valign=urwid.MIDDLE, height=urwid.PACK, left=2, right=2)
self.frame.contents["body"] = (overlay, self.frame.options())
self.frame.focus_position = "body"
def paper_message_failed(self):
def dismiss_dialog(sender):
self.dialog_open = False
self.conversation_changed(None)
dialog = DialogLineBox(
urwid.Pile([
urwid.Text(
"Could not output paper message,\ncheck your settings. See the log\nfile for any error messages.\n",
align=urwid.CENTER,
),
urwid.Columns([
(urwid.WEIGHT, 0.6, urwid.Text("")),
(urwid.WEIGHT, 0.4, urwid.Button("OK", on_press=dismiss_dialog)),
])
]), title="!"
)
dialog.delegate = self
bottom = self.messagelist
overlay = urwid.Overlay(dialog, bottom, align=urwid.CENTER, width=34, valign=urwid.MIDDLE, height=urwid.PACK, left=2, right=2)
self.frame.contents["body"] = (overlay, self.frame.options())
self.frame.focus_position = "body"
def close(self):
self.delegate.close_conversation(self)
class LXMessageWidget(urwid.WidgetWrap):
def __init__(self, message):
app = nomadnet.NomadNetworkApp.get_shared_instance()
g = app.ui.glyphs
self.timestamp = message.get_timestamp()
self.sort_timestamp = message.sort_timestamp
self.transfer_done = False
self._live_lxm = None
msg_hash = message.get_hash()
msg_state = message.get_state()
msg_source_hash = message._cached_source_hash
msg_method = message._cached_method
time_format = app.time_format
message_time = datetime.fromtimestamp(self.timestamp)
encryption_string = ""
if message.get_transport_encrypted():
encryption_string = " "+g["encrypted"]
else:
encryption_string = " "+g["plaintext"]
title_string = relative_time(self.timestamp)+" | "+message_time.strftime(time_format)+encryption_string
is_outbound = False
if msg_source_hash is None:
header_style = "msg_header_failed"
title_string = g["warning"]+" "+title_string
elif app.lxmf_destination.hash == msg_source_hash:
is_outbound = True
if msg_state == LXMF.LXMessage.DELIVERED:
header_style = "msg_header_delivered"
title_string = g["check"]+" "+g["arrow_r"]+" "+title_string
elif msg_state == LXMF.LXMessage.FAILED:
header_style = "msg_header_failed"
title_string = g["cross"]+" "+g["arrow_r"]+" "+title_string
elif msg_state == LXMF.LXMessage.REJECTED:
header_style = "msg_header_failed"
title_string = g["cross"]+" "+g["arrow_r"]+" Rejected "+title_string
elif msg_method == LXMF.LXMessage.PROPAGATED and msg_state == LXMF.LXMessage.SENT:
header_style = "msg_header_propagated"
title_string = g["sent"]+" "+g["arrow_r"]+" "+title_string
elif msg_method == LXMF.LXMessage.PAPER and msg_state == LXMF.LXMessage.PAPER:
header_style = "msg_header_propagated"
title_string = g["papermsg"]+" "+g["arrow_r"]+" "+title_string
elif msg_state == LXMF.LXMessage.SENT:
header_style = "msg_header_sent"
title_string = g["sent"]+" "+g["arrow_r"]+" "+title_string
else:
header_style = "msg_header_sent"
title_string = g["arrow_r"]+" "+title_string
else:
if message.signature_validated():
header_style = "msg_header_ok"
title_string = g["check"]+" "+g["arrow_l"]+" "+title_string
else:
header_style = "msg_header_caution"
title_string = g["warning"]+" "+g["arrow_l"]+" "+message.get_signature_description() + "\n " + title_string
if message.get_title() != "":
title_string += " | " + message.get_title()
has_attachments = message.has_attachments()
cached_names = message._cached_attachment_names or []
if has_attachments and cached_names:
attachment_strings = []
for atype, aname, *arest in cached_names:
attachment_strings.append(g[atype if atype != "file" else "file"]+" "+aname)
title_string += " | " + " ".join(attachment_strings)
title = urwid.AttrMap(urwid.Text(title_string), header_style)
self.progress_widget = urwid.Text("")
self.progress_attr = urwid.AttrMap(self.progress_widget, "progress_full")
content_text = message.get_content()
content_lines = content_text.split("\n")
indented = "\n".join(" "+line for line in content_lines)
pile_widgets = [title]
if is_outbound and msg_state is not None and msg_state < LXMF.LXMessage.SENT and msg_hash is not None:
try:
for pending in app.message_router.pending_outbound:
if pending.hash == msg_hash:
if pending.representation == LXMF.LXMessage.RESOURCE:
self._live_lxm = pending
break
except Exception:
pass
if self._live_lxm is not None:
pct = int(self._live_lxm.progress * 100)
bar_width = 20
filled = int(bar_width * self._live_lxm.progress)
if app.ui.colormode >= 256:
bar = "\u2588" * filled + "\u2591" * (bar_width - filled)
else:
bar = "#" * filled + "-" * (bar_width - filled)
self.progress_widget.set_text(" ["+bar+"] "+str(pct)+"%")
pile_widgets.append(self.progress_attr)
self._start_progress_poll()
pile_widgets.append(urwid.Text(indented))
if has_attachments and cached_names:
att_file_idx = 0
for atype, aname, *arest in cached_names:
glyph = g["file"] if atype == "file" else g[atype]
asize = arest[0] if arest else 0
label = " "+glyph+" "+aname
if asize > 0:
label += " ("+_format_size(asize)+")"
if atype == "file":
pile_widgets.append(ClickableAttachment(label, aname, message, "file", att_file_idx))
att_file_idx += 1
else:
pile_widgets.append(ClickableAttachment(label, aname, message, atype))
pile_widgets.append(urwid.Text(""))
super().__init__(urwid.Pile(pile_widgets))
def _start_progress_poll(self):
try:
loop = nomadnet.NomadNetworkApp.get_shared_instance().ui.loop
if loop:
loop.set_alarm_in(0.3, self._poll_progress)
except Exception:
pass
def _poll_progress(self, loop=None, user_data=None):
if self.transfer_done:
return
if self._live_lxm is None:
self.transfer_done = True
return
app = nomadnet.NomadNetworkApp.get_shared_instance()
g = app.ui.glyphs
progress = self._live_lxm.progress
state = self._live_lxm.state
pct = int(progress * 100)
if state == LXMF.LXMessage.FAILED:
self.progress_widget.set_text(" "+g["cross"]+" Transfer failed")
self.transfer_done = True
self._live_lxm = None
elif state == LXMF.LXMessage.REJECTED:
self.progress_widget.set_text(" "+g["cross"]+" Rejected: too large or not accepted")
self.transfer_done = True
self._live_lxm = None
elif state >= LXMF.LXMessage.SENT:
self.progress_widget.set_text("")
self.transfer_done = True
self._live_lxm = None
else:
bar_width = 20
filled = int(bar_width * progress)
if app.ui.colormode >= 256:
bar = "\u2588" * filled + "\u2591" * (bar_width - filled)
else:
bar = "#" * filled + "-" * (bar_width - filled)
self.progress_widget.set_text(" ["+bar+"] "+str(pct)+"%")
if not self.transfer_done:
try:
ui_loop = app.ui.loop
if ui_loop:
ui_loop.set_alarm_in(0.3, self._poll_progress)
ui_loop.draw_screen()
except Exception:
pass
class ClickableAttachment(urwid.Text):
def __init__(self, label, filename, conv_message, field_type, field_index=0):
self.filename = filename
self.conv_message = conv_message
self.field_type = field_type
self.field_index = field_index
self.saved = False
super().__init__(label)
def mouse_event(self, size, event, button, x, y, focus):
if button == 1 and urwid.util.is_mouse_press(event):
self._save()
return True
return False
def _save(self):
if self.saved:
return
app = nomadnet.NomadNetworkApp.get_shared_instance()
g = app.ui.glyphs
try:
src_path = self.conv_message.get_attachment_file_path(self.field_type, self.field_index)
if src_path and os.path.isfile(src_path):
save_path = _copy_attachment_to_dest(self.filename, src_path)
else:
if self.field_type == "file":
attachments = self.conv_message.get_file_attachments()
if self.field_index < len(attachments):
att = attachments[self.field_index]
if isinstance(att, list) and len(att) >= 2:
data = att[1] if isinstance(att[1], bytes) else b""
else:
data = b""
else:
data = b""
elif self.field_type == "image":
data = self.conv_message.get_image()
data = data if isinstance(data, bytes) else b""
elif self.field_type == "audio":
data = self.conv_message.get_audio()
data = data if isinstance(data, bytes) else b""
else:
data = b""
self.conv_message.unload()
if not data:
return
save_path = _save_attachment_to_disk(self.filename, data)
self.saved = True
self.set_text(" "+g["check"]+" Copied to: "+save_path)
except Exception as e:
RNS.log("Error saving attachment: "+str(e), RNS.LOG_ERROR)
self.set_text(" "+g["cross"]+" Save failed: "+str(e))
def _copy_attachment_to_dest(filename, src_path):
app = nomadnet.NomadNetworkApp.get_shared_instance()
save_dir = app.attachment_save_path if app.attachment_save_path else app.downloads_path
if not os.path.isdir(save_dir):
os.makedirs(save_dir)
save_path = os.path.join(save_dir, filename)
counter = 0
base, ext = os.path.splitext(filename)
while os.path.isfile(save_path):
counter += 1
save_path = os.path.join(save_dir, base+"_"+str(counter)+ext)
shutil.copy2(src_path, save_path)
return save_path
def _save_attachment_to_disk(filename, data):
app = nomadnet.NomadNetworkApp.get_shared_instance()
save_dir = app.attachment_save_path if app.attachment_save_path else app.downloads_path
if not os.path.isdir(save_dir):
os.makedirs(save_dir)
save_path = os.path.join(save_dir, filename)
counter = 0
base, ext = os.path.splitext(filename)
while os.path.isfile(save_path):
counter += 1
save_path = os.path.join(save_dir, base+"_"+str(counter)+ext)
with open(save_path, "wb") as f:
f.write(data)
return save_path
class FileBrowserEntry(urwid.WidgetWrap):
signals = ["click"]
def __init__(self, name, full_path, is_dir=False, is_parent=False, selected=False):
self.full_path = full_path
self.name = name
self.is_dir = is_dir
self.is_parent = is_parent
self.selected = selected
g = nomadnet.NomadNetworkApp.get_shared_instance().ui.glyphs
if is_parent:
display = g["arrow_l"]+" .."
elif is_dir:
display = g["arrow_r"]+" "+name+"/"
elif selected:
display = g["check"]+" "+name
else:
display = " "+name
self.text_widget = urwid.SelectableIcon(display, 0)
if is_dir or is_parent:
style = "list_trusted"
focus_style = "list_focus"
elif selected:
style = "list_trusted"
focus_style = "list_focus_trusted"
else:
style = "list_unknown"
focus_style = "list_focus"
display_widget = urwid.AttrMap(self.text_widget, style, focus_style)
super().__init__(display_widget)
def keypress(self, size, key):
if key == "enter":
self._emit("click")
else:
return key
def mouse_event(self, size, event, button, x, y, focus):
if button == 1 and urwid.util.is_mouse_press(event):
self._emit("click")
return True
return False
class FileBrowserDialog(urwid.WidgetWrap):
def __init__(self, delegate):
self.delegate = delegate
app = nomadnet.NomadNetworkApp.get_shared_instance()
self.g = app.ui.glyphs
self.current_path = os.path.expanduser("~")
self.path_label = urwid.Text("")
self.status_label = urwid.Text("")
self.file_walker = urwid.SimpleFocusListWalker([])
self.file_listbox = urwid.ListBox(self.file_walker)
self.button_columns = urwid.Columns([
(urwid.WEIGHT, 0.45, urwid.Button("Done", on_press=self._dismiss)),
(urwid.WEIGHT, 0.1, urwid.Text("")),
(urwid.WEIGHT, 0.45, urwid.Button("Cancel", on_press=self._cancel)),
])
header_pile = urwid.Pile([
self.path_label,
self.status_label,
urwid.Divider(self.g["divider1"]),
])
footer_pile = urwid.Pile([
urwid.Divider(self.g["divider1"]),
self.button_columns,
])
self._populate()
self.browser_frame = urwid.Frame(
self.file_listbox,
header=header_pile,
footer=footer_pile,
)
linebox = urwid.LineBox(self.browser_frame, title="Attach File")
super().__init__(linebox)
def _update_status(self):
pending = self.delegate.pending_attachments
if pending:
names = [os.path.basename(p) for p in pending]
self.status_label.set_text(" "+self.g["file"]+" "+str(len(pending))+" selected: "+", ".join(names))
else:
self.status_label.set_text(" No files selected")
def _populate(self):
self.path_label.set_text(" "+self.current_path)
self._update_status()
focus_pos = None
try:
focus_pos = self.file_listbox.focus_position
except Exception:
pass
entries = []
parent = os.path.dirname(self.current_path)
if parent != self.current_path:
entry = FileBrowserEntry("..", parent, is_parent=True)
urwid.connect_signal(entry, "click", self._entry_clicked, entry)
entries.append(entry)
try:
items = sorted(os.listdir(self.current_path))
except PermissionError:
entries.append(urwid.Text(("error_text", " Permission denied")))
self.file_walker[:] = entries
return
dirs = []
files = []
for item in items:
if item.startswith("."):
continue
full = os.path.join(self.current_path, item)
if os.path.isdir(full):
dirs.append((item, full))
elif os.path.isfile(full):
files.append((item, full))
for name, full in dirs:
entry = FileBrowserEntry(name, full, is_dir=True)
urwid.connect_signal(entry, "click", self._entry_clicked, entry)
entries.append(entry)
for name, full in files:
is_selected = full in self.delegate.pending_attachments
entry = FileBrowserEntry(name, full, selected=is_selected)
urwid.connect_signal(entry, "click", self._entry_clicked, entry)
entries.append(entry)
if not dirs and not files:
entries.append(urwid.Text(("inactive_text", " (empty)")))
self.file_walker[:] = entries
if focus_pos is not None and focus_pos < len(entries):
self.file_listbox.set_focus(focus_pos)
elif entries:
self.file_listbox.set_focus(0)
def _entry_clicked(self, entry_widget, user_data=None):
entry = user_data if user_data else entry_widget
if entry.is_dir or entry.is_parent:
self.current_path = entry.full_path
self._populate()
else:
if entry.full_path in self.delegate.pending_attachments:
self.delegate.pending_attachments.remove(entry.full_path)
else:
self.delegate.pending_attachments.append(entry.full_path)
self.delegate.frame.contents["footer"] = (self.delegate._build_footer(), None)
self._populate()
def _dismiss(self, sender):
self.delegate.file_browser_closed()
def _cancel(self, sender):
self.delegate.pending_attachments.clear()
self.delegate.frame.contents["footer"] = (self.delegate._build_footer(), None)
self.delegate.file_browser_closed()
def keypress(self, size, key):
if key == "esc":
self.delegate.file_browser_closed()
return
result = super().keypress(size, key)
if result == "down" and self.browser_frame.focus_position == "body":
self.browser_frame.focus_position = "footer"
return
elif result == "up" and self.browser_frame.focus_position == "footer":
self.browser_frame.focus_position = "body"
return
return result
class SyncProgressBar(urwid.ProgressBar):
def get_text(self):
status = nomadnet.NomadNetworkApp.get_shared_instance().get_sync_status()
show_percent = nomadnet.NomadNetworkApp.get_shared_instance().sync_status_show_percent()
if show_percent:
return status+" "+super().get_text()
else:
return status