feat: Add contact and channel database adapters for pymc_core integration

- Introduced ContactBookAdapter and ChannelDatabaseAdapter classes to facilitate interaction with the bot's database for contact and channel information.
- Implemented caching mechanisms for efficient retrieval of contacts and channels.
- Enhanced channel handling by including public and common hashtag channels, along with custom channels from the database.
- Added error handling and logging for improved debugging and operational transparency.
This commit is contained in:
agessaman
2026-02-01 16:59:55 -08:00
parent 95184dba97
commit c08dba316b
2 changed files with 1051 additions and 156 deletions
+458
View File
@@ -0,0 +1,458 @@
"""
MeshTNC Serial Wrapper - CLI Mode with RXLOG parsing
This wrapper uses MeshTNC's CLI mode instead of KISS mode as a workaround
for the KISS RX bug where received packets are not forwarded to serial.
In CLI mode:
- RX packets come as RXLOG lines: "timestamp,RXLOG,rssi,snr,hex_data"
- TX packets are sent via KISS mode (temporarily switch, send, switch back)
"""
import asyncio
import logging
import threading
import time
from collections import deque
from typing import Any, Callable, Dict, Optional
import serial
logger = logging.getLogger("MeshTNCSerial")
# KISS Protocol Constants (for TX only)
KISS_FEND = 0xC0
KISS_FESC = 0xDB
KISS_TFEND = 0xDC
KISS_TFESC = 0xDD
class MeshTNCSerial:
"""
MeshTNC Serial Interface using CLI mode for RX and KISS for TX.
Implements the same interface as KissSerialWrapper for compatibility
with pymc_core's Dispatcher.
"""
def __init__(
self,
port: str,
baudrate: int = 115200,
timeout: float = 1.0,
radio_config: Optional[Dict[str, Any]] = None,
on_frame_received: Optional[Callable[[bytes], None]] = None,
):
self.port = port
self.baudrate = baudrate
self.timeout = timeout
self.radio_config = radio_config or {}
self.serial_conn: Optional[serial.Serial] = None
self.is_connected = False
self.kiss_mode_active = False
# RX handling
self.on_frame_received = on_frame_received
self.rx_thread: Optional[threading.Thread] = None
self.stop_event = threading.Event()
# TX queue for KISS frames
self.tx_queue = deque(maxlen=100)
self.tx_thread: Optional[threading.Thread] = None
self.tx_lock = threading.Lock()
# Stats
self.stats = {
"frames_sent": 0,
"frames_received": 0,
"bytes_sent": 0,
"bytes_received": 0,
"last_rssi": None,
"last_snr": None,
}
# Line buffer for CLI mode parsing
self._line_buffer = ""
# Event loop reference for thread-safe callbacks
self._loop: Optional[asyncio.AbstractEventLoop] = None
def connect(self) -> bool:
"""Connect to MeshTNC and configure radio."""
try:
self.serial_conn = serial.Serial(
port=self.port,
baudrate=self.baudrate,
timeout=self.timeout,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
)
self.is_connected = True
self.stop_event.clear()
logger.info(f"MeshTNC connected to {self.port} at {self.baudrate} baud")
# Exit any existing KISS mode
self._exit_kiss_mode()
time.sleep(0.3)
self._clear_buffer()
# Configure radio
if self.radio_config:
if not self._configure_radio():
logger.error("Failed to configure radio")
return False
# Enable rxlog for receiving packets
self._send_command("rxlog on")
time.sleep(0.2)
self._clear_buffer()
# Start RX thread (CLI mode parsing)
self.rx_thread = threading.Thread(target=self._rx_worker, daemon=True)
self.rx_thread.start()
# Start TX thread
self.tx_thread = threading.Thread(target=self._tx_worker, daemon=True)
self.tx_thread.start()
logger.info("MeshTNC configured in CLI mode with rxlog enabled")
return True
except Exception as e:
logger.error(f"Failed to connect to MeshTNC: {e}")
self.is_connected = False
return False
def disconnect(self):
"""Disconnect from MeshTNC."""
self.is_connected = False
self.stop_event.set()
if self.rx_thread and self.rx_thread.is_alive():
self.rx_thread.join(timeout=2.0)
if self.tx_thread and self.tx_thread.is_alive():
self.tx_thread.join(timeout=2.0)
if self.serial_conn and self.serial_conn.is_open:
try:
self._send_command("rxlog off")
except:
pass
self.serial_conn.close()
logger.info("MeshTNC disconnected")
def _configure_radio(self) -> bool:
"""Configure radio settings via CLI."""
try:
freq_hz = self.radio_config.get("frequency", 910525000)
bw_hz = self.radio_config.get("bandwidth", 62500)
sf = self.radio_config.get("spreading_factor", 7)
cr = self.radio_config.get("coding_rate", 5)
sync_word = self.radio_config.get("sync_word", 0x12)
# Convert to CLI format
freq_mhz = freq_hz / 1_000_000
bw_khz = bw_hz / 1000
if isinstance(sync_word, int):
sync_str = f"0x{sync_word:02X}"
else:
sync_str = str(sync_word)
cmd = f"set radio {freq_mhz},{bw_khz},{sf},{cr},{sync_str}"
logger.info(f"Configuring radio: {cmd}")
response = self._send_command(cmd)
logger.info(f"Radio config response: {response}")
return True
except Exception as e:
logger.error(f"Radio configuration error: {e}")
return False
def _send_command(self, cmd: str) -> str:
"""Send a CLI command and return response."""
if not self.serial_conn or not self.serial_conn.is_open:
return ""
try:
self.serial_conn.write(f"{cmd}\r\n".encode('ascii'))
self.serial_conn.flush()
time.sleep(0.3)
response = ""
if self.serial_conn.in_waiting > 0:
response = self.serial_conn.read(
self.serial_conn.in_waiting
).decode('utf-8', errors='ignore')
return response.strip()
except Exception as e:
logger.error(f"Command error: {e}")
return ""
def _clear_buffer(self):
"""Clear serial buffer."""
if self.serial_conn and self.serial_conn.is_open:
try:
if self.serial_conn.in_waiting > 0:
self.serial_conn.read(self.serial_conn.in_waiting)
except:
pass
def _exit_kiss_mode(self):
"""Exit KISS mode if active."""
if self.serial_conn and self.serial_conn.is_open:
try:
self.serial_conn.write(bytes([KISS_FEND, 0xFF, KISS_FEND]))
self.serial_conn.flush()
self.kiss_mode_active = False
except:
pass
def _rx_worker(self):
"""Background thread for receiving and parsing RXLOG lines."""
logger.info("RX worker started (CLI/RXLOG mode)")
while not self.stop_event.is_set() and self.is_connected:
try:
if self.serial_conn and self.serial_conn.in_waiting > 0:
data = self.serial_conn.read(self.serial_conn.in_waiting)
text = data.decode('utf-8', errors='ignore')
self._line_buffer += text
# Process complete lines
while '\n' in self._line_buffer:
line, self._line_buffer = self._line_buffer.split('\n', 1)
line = line.strip()
if line:
self._process_line(line)
else:
time.sleep(0.01)
except Exception as e:
if self.is_connected:
logger.error(f"RX worker error: {e}")
break
logger.info("RX worker stopped")
def _process_line(self, line: str):
"""Process a CLI output line, looking for RXLOG entries."""
# RXLOG format: timestamp,RXLOG,rssi,snr,hex_data
# Example: 946688557,RXLOG,-91.00,10.00,15125A077ECD3CE859BF...
if ',RXLOG,' not in line:
return
try:
parts = line.split(',')
if len(parts) >= 5 and parts[1] == 'RXLOG':
rssi = float(parts[2])
snr = float(parts[3])
hex_data = parts[4]
# Convert hex to bytes
packet_data = bytes.fromhex(hex_data)
# Update stats
self.stats["frames_received"] += 1
self.stats["bytes_received"] += len(packet_data)
self.stats["last_rssi"] = rssi
self.stats["last_snr"] = snr
logger.debug(f"RX packet: {len(packet_data)} bytes, RSSI={rssi}, SNR={snr}")
# Call the callback (thread-safe)
if self.on_frame_received and len(packet_data) > 0:
self._invoke_callback(packet_data)
except Exception as e:
logger.debug(f"Failed to parse RXLOG line: {line} - {e}")
def _invoke_callback(self, packet_data: bytes):
"""Invoke the RX callback in a thread-safe manner."""
if not self.on_frame_received:
return
try:
# Try to get the running loop (works if called from async context)
try:
loop = asyncio.get_running_loop()
# We're in an async context, create task directly
loop.create_task(self._async_callback_wrapper(packet_data))
return
except RuntimeError:
pass
# Use stored loop reference if available
if self._loop and self._loop.is_running():
# Schedule callback on the event loop from this thread
self._loop.call_soon_threadsafe(
lambda: self._loop.create_task(
self._async_callback_wrapper(packet_data)
)
)
else:
# Fallback: direct call (may not work with async callbacks)
self.on_frame_received(packet_data)
except Exception as e:
logger.error(f"Error invoking callback: {e}")
async def _async_callback_wrapper(self, packet_data: bytes):
"""Async wrapper to invoke the callback."""
try:
self.on_frame_received(packet_data)
except Exception as e:
logger.error(f"Error in frame received callback: {e}")
def _tx_worker(self):
"""Background thread for transmitting packets."""
logger.info("TX worker started")
while not self.stop_event.is_set() and self.is_connected:
try:
if self.tx_queue:
with self.tx_lock:
if self.tx_queue:
packet = self.tx_queue.popleft()
self._transmit_packet(packet)
else:
time.sleep(0.01)
except Exception as e:
if self.is_connected:
logger.error(f"TX worker error: {e}")
break
logger.info("TX worker stopped")
def _transmit_packet(self, data: bytes):
"""Transmit a packet using KISS mode temporarily."""
if not self.serial_conn or not self.serial_conn.is_open:
return
try:
# Enter KISS mode
self._send_command("serial mode kiss")
time.sleep(0.5)
self.kiss_mode_active = True
# Encode as KISS frame
kiss_frame = self._encode_kiss_frame(data)
# Send
self.serial_conn.write(kiss_frame)
self.serial_conn.flush()
self.stats["frames_sent"] += 1
self.stats["bytes_sent"] += len(data)
logger.debug(f"TX packet: {len(data)} bytes")
# Wait for transmission to complete
time.sleep(0.3)
# Exit KISS mode and re-enable rxlog
self._exit_kiss_mode()
time.sleep(0.2)
self._clear_buffer()
self._send_command("rxlog on")
time.sleep(0.1)
self._clear_buffer()
except Exception as e:
logger.error(f"Transmit error: {e}")
# Try to recover
self._exit_kiss_mode()
self._send_command("rxlog on")
def _encode_kiss_frame(self, data: bytes) -> bytes:
"""Encode data as a KISS frame."""
frame = bytearray([KISS_FEND, 0x00]) # FEND + Data command
for byte in data:
if byte == KISS_FEND:
frame.extend([KISS_FESC, KISS_TFEND])
elif byte == KISS_FESC:
frame.extend([KISS_FESC, KISS_TFESC])
else:
frame.append(byte)
frame.append(KISS_FEND)
return bytes(frame)
# =========================================================================
# Interface methods compatible with KissSerialWrapper / LoRaRadio
# =========================================================================
def set_rx_callback(self, callback: Callable[[bytes], None]):
"""Set the RX callback function."""
self.on_frame_received = callback
# Capture the current event loop for thread-safe callbacks
try:
self._loop = asyncio.get_running_loop()
logger.debug("RX callback set with event loop reference")
except RuntimeError:
# No loop running yet, will be set later
logger.debug("RX callback set (no event loop yet)")
def set_event_loop(self, loop: asyncio.AbstractEventLoop):
"""Explicitly set the event loop for thread-safe callbacks."""
self._loop = loop
logger.debug("Event loop reference set explicitly")
def send_frame(self, data: bytes) -> bool:
"""Queue a frame for transmission."""
if not self.is_connected:
return False
with self.tx_lock:
if len(self.tx_queue) < 100:
self.tx_queue.append(data)
return True
logger.warning("TX queue full")
return False
async def send(self, data: bytes) -> None:
"""Async send interface for pymc_core compatibility."""
if not self.send_frame(data):
raise Exception("Failed to queue frame for transmission")
return None
def begin(self):
"""Initialize the radio (called by some interfaces)."""
if not self.connect():
raise Exception("Failed to initialize MeshTNC")
def get_last_rssi(self) -> float:
"""Get last received RSSI."""
return self.stats.get("last_rssi") or -999
def get_last_snr(self) -> float:
"""Get last received SNR."""
return self.stats.get("last_snr") or -999
def get_stats(self) -> Dict[str, Any]:
"""Get interface statistics."""
return self.stats.copy()
def sleep(self):
"""Put radio to sleep (not supported)."""
pass
def __enter__(self):
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.disconnect()
+593 -156
View File
@@ -30,6 +30,14 @@ except ImportError as e:
KissSerialWrapper = None
pymc_constants = None
# Import MeshTNC CLI mode wrapper (workaround for KISS RX bug)
try:
from modules.meshtnc_serial import MeshTNCSerial
MESHTNC_SERIAL_AVAILABLE = True
except ImportError:
MeshTNCSerial = None
MESHTNC_SERIAL_AVAILABLE = False
class EventType(IntEnum):
"""Event types compatible with meshcore EventType for bot compatibility."""
@@ -70,6 +78,230 @@ class ContactInfo:
return getattr(self, key, default)
class ContactBookAdapter:
"""
Adapter to provide contact information from the bot's database to pymc_core.
pymc_core's TextMessageHandler expects a contacts object with a 'contacts'
attribute that is iterable and contains contact objects with 'public_key'.
"""
def __init__(self, bot):
self.bot = bot
self.logger = logging.getLogger("ContactBookAdapter")
self._contact_cache: List[Any] = []
self._cache_time = 0
self._cache_ttl = 30 # Refresh cache every 30 seconds
@property
def contacts(self) -> List[Any]:
"""Get contacts list (required by pymc_core's dispatcher)."""
return self._get_contacts()
def _get_contacts(self) -> List[Any]:
"""Get all contacts from the database."""
import time as _time
# Check if cache is still valid
now = _time.time()
if now - self._cache_time < self._cache_ttl and self._contact_cache:
return self._contact_cache
contacts = []
try:
if not hasattr(self.bot, 'db_manager'):
self.logger.debug("Database manager not available for contacts")
return contacts
# Query contacts from database
query = """
SELECT public_key, name, role, device_type
FROM complete_contact_tracking
WHERE public_key IS NOT NULL AND public_key != ''
"""
rows = self.bot.db_manager.execute_query(query)
for row in rows:
# Create a simple object that pymc_core can use
contact = type('Contact', (), {
'public_key': row.get('public_key', ''),
'name': row.get('name', ''),
'role': row.get('role', 'companion'),
})()
contacts.append(contact)
self._contact_cache = contacts
self._cache_time = now
self.logger.debug(f"Loaded {len(contacts)} contacts from database")
except Exception as e:
self.logger.error(f"Error loading contacts: {e}")
return contacts
def get_contact_by_public_key(self, public_key: str):
"""Find a contact by public key."""
for contact in self.contacts:
if contact.public_key == public_key:
return contact
return None
class ChannelDatabaseAdapter:
"""
Adapter to provide channel information from the bot's database to pymc_core.
pymc_core's GroupTextHandler expects a channel_db with a get_channels() method
that returns a list of dicts with 'name' and 'secret' keys.
"""
# Well-known Public channel secret (from MeshCore spec)
PUBLIC_CHANNEL_SECRET = "d8ee687c9be53be08d24a7f7aede4dac5de3168dea03c12e7b9c96c5511e807f"
# Common hashtag channels to always include
COMMON_HASHTAG_CHANNELS = ['#Public', '#general', '#test']
def __init__(self, bot):
self.bot = bot
self.logger = logging.getLogger("ChannelDatabaseAdapter")
self._channel_cache: List[Dict[str, Any]] = []
self._cache_time = 0
self._cache_ttl = 30 # Refresh cache every 30 seconds
@staticmethod
def derive_hashtag_key(channel_name: str) -> str:
"""
Derive channel key from a hashtag channel name.
For channels starting with #, the key is derived by SHA256 hashing
the channel name (including the #).
Args:
channel_name: Channel name (e.g., "#howltest")
Returns:
Hex string of the 32-byte derived key
"""
import hashlib
# Hash the channel name (including #) to get the key
key = hashlib.sha256(channel_name.encode('utf-8')).digest()
return key.hex()
@staticmethod
def get_channel_hash(secret_hex: str) -> int:
"""
Get the channel hash from a secret (first byte of SHA256(secret)).
This matches pymc_core's _derive_channel_hash method.
"""
import hashlib
try:
secret_bytes = bytes.fromhex(secret_hex)
except ValueError:
secret_bytes = secret_hex.encode('utf-8')
return hashlib.sha256(secret_bytes).digest()[0]
def get_channels(self) -> List[Dict[str, Any]]:
"""
Get all channels with their secrets.
Includes:
- Public channel (well-known key)
- Hashtag channels from config (derived keys)
- Custom channels from database
Returns:
List of channel dicts with 'name' and 'secret' keys
"""
import time as _time
# Check if cache is still valid
now = _time.time()
if now - self._cache_time < self._cache_ttl and self._channel_cache:
self.logger.debug(f"Returning cached {len(self._channel_cache)} channels")
return self._channel_cache
channels = []
self.logger.info("Building channel list...")
# Always include the Public channel (well-known secret)
public_hash = self.get_channel_hash(self.PUBLIC_CHANNEL_SECRET)
channels.append({
'name': 'Public',
'secret': self.PUBLIC_CHANNEL_SECRET,
'idx': 0
})
self.logger.info(f"Added Public channel with hash 0x{public_hash:02X}")
# Add common hashtag channels
for ch_name in self.COMMON_HASHTAG_CHANNELS:
derived_key = self.derive_hashtag_key(ch_name)
ch_hash = self.get_channel_hash(derived_key)
if not any(c['name'] == ch_name for c in channels):
channels.append({
'name': ch_name,
'secret': derived_key,
'idx': len(channels)
})
self.logger.info(f"Added common channel {ch_name} with hash 0x{ch_hash:02X}")
# Add hashtag channels from config
try:
if hasattr(self.bot, 'config'):
monitor_channels = self.bot.config.get('Channels', 'monitor_channels', fallback='')
if monitor_channels:
for ch_name in monitor_channels.split(','):
ch_name = ch_name.strip()
if ch_name.startswith('#') and not any(c['name'] == ch_name for c in channels):
# Derive key for hashtag channel
derived_key = self.derive_hashtag_key(ch_name)
ch_hash = self.get_channel_hash(derived_key)
channels.append({
'name': ch_name,
'secret': derived_key,
'idx': len(channels)
})
self.logger.debug(f"Added config channel {ch_name} with hash 0x{ch_hash:02X}")
except Exception as e:
self.logger.warning(f"Error loading hashtag channels from config: {e}")
# Add channels from database
try:
if hasattr(self.bot, 'db_manager'):
query = """
SELECT channel_idx, channel_name, channel_key_hex
FROM channels
WHERE channel_key_hex IS NOT NULL AND channel_key_hex != ''
"""
rows = self.bot.db_manager.execute_query(query)
for row in rows:
channel_name = row.get('channel_name', '')
channel_key_hex = row.get('channel_key_hex', '')
if channel_name and channel_key_hex:
# Don't duplicate if already added
if not any(c['name'] == channel_name for c in channels):
channels.append({
'name': channel_name,
'secret': channel_key_hex,
'idx': row.get('channel_idx', 0)
})
except Exception as e:
self.logger.warning(f"Error loading channels from database: {e}")
self._channel_cache = channels
self._cache_time = now
self.logger.info(f"Loaded {len(channels)} channels (Public + {len(channels)-1} configured)")
return channels
class PyMCConnection:
"""
Wrapper around pyMC_core MeshNode providing a meshcore-compatible interface.
@@ -170,41 +402,67 @@ class PyMCConnection:
}
self.logger.info(f"Radio config: freq={frequency/1e6:.3f}MHz, BW={bandwidth/1000}kHz, SF={spreading_factor}")
# Create KISS serial wrapper
self._radio = KissSerialWrapper(
port=serial_port,
baudrate=baudrate,
radio_config=kiss_config,
auto_configure=True
)
# Use MeshTNCSerial (CLI/rxlog mode) instead of KissSerialWrapper
# This is a workaround for the MeshTNC KISS mode RX bug where
# received packets are not forwarded to serial in KISS mode
if MESHTNC_SERIAL_AVAILABLE:
self.logger.info("Using MeshTNCSerial (CLI/rxlog mode) for RX")
self._radio = MeshTNCSerial(
port=serial_port,
baudrate=baudrate,
radio_config=kiss_config,
)
else:
self.logger.info("Using KissSerialWrapper (KISS mode)")
self._radio = KissSerialWrapper(
port=serial_port,
baudrate=baudrate,
radio_config=kiss_config,
auto_configure=True
)
# Connect to radio
if not self._radio.connect():
self.logger.error(f"Failed to connect to MeshTNC on {serial_port}")
return False
self.logger.info("KISS radio connected successfully")
self.logger.info("Radio connected successfully")
# Get bot name from config
bot_name = self.config.get('bot_name', 'MeshCoreBot')
# Create mesh node
# Create adapters for pymc_core
self._channel_db = ChannelDatabaseAdapter(self.bot)
self._contact_book = ContactBookAdapter(self.bot)
# Create mesh node with channel database and contacts
node_config = {
"node": {"name": bot_name}
}
self._mesh_node = MeshNode(
radio=self._radio,
local_identity=self._identity,
config=node_config
config=node_config,
channel_db=self._channel_db,
contacts=self._contact_book
)
# Start the mesh node
await self._mesh_node.start()
# Setup event handlers
# Setup event handlers BEFORE starting the node
# This ensures callbacks are registered before packets arrive
self._setup_event_handlers()
# Start the mesh node dispatcher as a background task
# (MeshNode.start() calls dispatcher.run_forever() which blocks)
self._dispatcher_task = asyncio.create_task(self._mesh_node.start())
self.logger.info("Mesh node dispatcher started as background task")
# Set the event loop on the radio for thread-safe callbacks
# This is needed because RX happens in a background thread
if hasattr(self._radio, 'set_event_loop'):
self._radio.set_event_loop(asyncio.get_running_loop())
self.logger.info("Event loop set on radio for thread-safe callbacks")
# Load contacts from database
await self._load_contacts_from_database()
@@ -231,21 +489,31 @@ class PyMCConnection:
async def disconnect(self) -> None:
"""Disconnect from radio."""
self._connected = False
# Cancel the dispatcher task
if hasattr(self, '_dispatcher_task') and self._dispatcher_task:
if not self._dispatcher_task.done():
self._dispatcher_task.cancel()
try:
await self._dispatcher_task
except asyncio.CancelledError:
pass
self._dispatcher_task = None
if self._mesh_node:
try:
await self._mesh_node.stop()
self._mesh_node.stop()
except Exception as e:
self.logger.warning(f"Error stopping mesh node: {e}")
self._mesh_node = None
if self._radio:
try:
self._radio.disconnect()
except Exception as e:
self.logger.warning(f"Error disconnecting radio: {e}")
self._radio = None
self.logger.info("PyMC connection closed")
def subscribe(self, event_type: EventType, callback: Callable) -> None:
@@ -276,19 +544,22 @@ class PyMCConnection:
self.logger.error(f"Error in event callback: {e}")
def _setup_event_handlers(self) -> None:
"""Setup handlers for pyMC_core events."""
"""Setup handlers for pyMC_core events via dispatcher callbacks."""
if not self._mesh_node:
self.logger.warning("Cannot setup event handlers - mesh node not initialized")
return
# Get the event service from mesh node
event_service = getattr(self._mesh_node, 'event_service', None)
if not event_service:
self.logger.warning("MeshNode does not have event_service")
return
# Register handlers for different packet types
# pyMC_core uses a different event system, we need to translate
self.logger.info("Event handlers setup for pyMC_core")
dispatcher = self._mesh_node.dispatcher
# Register raw packet callback for logging and RAW_DATA events
dispatcher.set_raw_packet_callback(self._on_raw_packet)
self.logger.info("Registered raw packet callback with dispatcher")
# Register packet received callback for processed packets
dispatcher.set_packet_received_callback(self._on_packet_received)
self.logger.info("Registered packet received callback with dispatcher")
self.logger.info("Event handlers setup for pyMC_core dispatcher")
async def _load_contacts_from_database(self) -> None:
"""Load contacts from the complete_contact_tracking table."""
@@ -364,140 +635,205 @@ class PyMCConnection:
async def start_auto_message_fetching(self) -> None:
"""
Start automatic message fetching loop.
For pyMC_core, this starts the receive loop that processes
incoming packets from the KISS TNC.
Start automatic message fetching.
For pyMC_core, message fetching is handled by the dispatcher's
callback system. This method ensures the dispatcher task is running.
"""
if not self._mesh_node:
self.logger.warning("Cannot start message fetching - mesh node not initialized")
return
# Start background task for receiving packets
asyncio.create_task(self._receive_loop())
self.logger.info("Auto message fetching started")
async def _receive_loop(self) -> None:
"""Background loop for receiving and processing packets."""
self.logger.info("Starting receive loop")
while self._connected and self._radio:
try:
# Check for incoming packets
if hasattr(self._radio, 'receive'):
packet = await asyncio.get_event_loop().run_in_executor(
None, self._radio.receive
)
if packet:
await self._process_incoming_packet(packet)
# Small delay to prevent busy loop
await asyncio.sleep(0.01)
except asyncio.CancelledError:
break
except Exception as e:
self.logger.error(f"Error in receive loop: {e}")
await asyncio.sleep(1)
self.logger.info("Receive loop stopped")
async def _process_incoming_packet(self, packet: bytes) -> None:
# The dispatcher task was started in connect()
# Just verify it's still running
if hasattr(self, '_dispatcher_task') and self._dispatcher_task:
if self._dispatcher_task.done():
# Check if it failed with an exception
try:
self._dispatcher_task.result()
except Exception as e:
self.logger.error(f"Dispatcher task failed: {e}")
# Restart it
self._dispatcher_task = asyncio.create_task(self._mesh_node.start())
self.logger.info("Restarted dispatcher task")
self.logger.info("Auto message fetching active (via dispatcher callbacks)")
async def _on_raw_packet(self, pkt, data: bytes, analysis: dict = None) -> None:
"""
Process an incoming packet and emit appropriate events.
Args:
packet: Raw packet bytes from radio
Callback for raw packets from the dispatcher.
This is called for EVERY packet received, before handler processing.
Emits RAW_DATA events for the bot's event system.
"""
try:
# Decode packet using pyMC_core
# This is a simplified version - full implementation would use
# pyMC_core's packet parsing
if len(packet) < 2:
return
# Extract header byte
header = packet[0]
route_type = header & 0x03
payload_type = (header >> 2) & 0x0F
# Calculate SNR/RSSI if available from KISS metadata
snr = 0
rssi = -120
# Emit raw data event
# Get signal strength from packet or radio
snr = getattr(pkt, '_snr', 0) or (self._radio.get_last_snr() if self._radio else 0)
rssi = getattr(pkt, '_rssi', -120) or (self._radio.get_last_rssi() if self._radio else -120)
self.logger.debug(f"Raw packet received: {len(data)} bytes, SNR={snr}, RSSI={rssi}")
# Emit raw data event (with 'data' field for bot compatibility)
await self._emit_event(
EventType.RAW_DATA,
{
'raw_hex': packet.hex(),
'data': data.hex(), # Bot expects 'data' field
'raw_hex': data.hex(),
'SNR': snr,
'RSSI': rssi,
'timestamp': time.time()
}
)
# Parse based on payload type
if payload_type == 0x01: # TXT_MSG
await self._handle_text_message(packet, snr, rssi)
elif payload_type == 0x04: # ADVERT
await self._handle_advertisement(packet, snr, rssi)
# Also emit RX_LOG_DATA for signal metrics
await self._emit_event(
EventType.RX_LOG_DATA,
{
'SNR': snr,
'RSSI': rssi,
'timestamp': time.time()
}
)
except Exception as e:
self.logger.error(f"Error processing packet: {e}")
async def _handle_text_message(self, packet: bytes, snr: float, rssi: float) -> None:
"""Handle incoming text message packet."""
self.logger.error(f"Error in raw packet callback: {e}")
async def _on_packet_received(self, pkt) -> None:
"""
Callback for processed packets from the dispatcher.
This is called after the dispatcher has parsed and handled the packet.
We translate pymc_core packets to our event system.
"""
try:
# Parse message packet
# This is simplified - full implementation needs proper MeshCore parsing
header = packet[0]
route_type = header & 0x03
# Skip header and extract path/payload
# Format depends on route type
payload = {
'text': '', # Will be decrypted
'pubkey_prefix': '',
# Import constants for payload types
from pymc_core.protocol.constants import (
PAYLOAD_TYPE_ADVERT,
PAYLOAD_TYPE_ACK,
PAYLOAD_TYPE_TXT_MSG,
PAYLOAD_TYPE_GRP_TXT,
PH_TYPE_SHIFT,
)
payload_type = pkt.header >> PH_TYPE_SHIFT
snr = getattr(pkt, '_snr', 0)
rssi = getattr(pkt, '_rssi', -120)
self.logger.info(f"Packet received: type={payload_type}, SNR={snr}, RSSI={rssi}")
if payload_type == PAYLOAD_TYPE_ADVERT:
await self._handle_advert_packet(pkt, snr, rssi)
elif payload_type == PAYLOAD_TYPE_TXT_MSG:
await self._handle_text_packet(pkt, snr, rssi)
elif payload_type == PAYLOAD_TYPE_GRP_TXT:
await self._handle_group_text_packet(pkt, snr, rssi)
# ACKs are handled by the dispatcher internally
except Exception as e:
self.logger.error(f"Error in packet received callback: {e}")
import traceback
self.logger.error(traceback.format_exc())
async def _handle_advert_packet(self, pkt, snr: float, rssi: float) -> None:
"""Handle advertisement packet from dispatcher."""
try:
# Extract advertisement info from packet
# The advert handler in pymc_core parses this
payload = pkt.payload if hasattr(pkt, 'payload') else b''
# Basic info we can extract
event_payload = {
'public_key': payload[2:34].hex() if len(payload) >= 34 else '',
'name': '', # Name is in the advert data after pubkey
'latitude': None,
'longitude': None,
'flags': payload[1] if len(payload) > 1 else 0,
'SNR': snr,
'RSSI': rssi,
'raw_hex': payload.hex() if payload else ''
}
# Try to extract name from advert
if len(payload) > 34:
try:
name_end = payload.find(b'\x00', 34)
if name_end > 34:
event_payload['name'] = payload[34:name_end].decode('utf-8', errors='ignore')
except Exception:
pass
self.logger.info(f"Advertisement from {event_payload.get('name', 'unknown')}")
# Emit new contact event
await self._emit_event(EventType.NEW_CONTACT, event_payload)
await self._emit_event(EventType.ADVERT_RECV, event_payload)
except Exception as e:
self.logger.error(f"Error handling advertisement: {e}")
async def _handle_text_packet(self, pkt, snr: float, rssi: float) -> None:
"""Handle text message packet from dispatcher."""
try:
# The text handler in pymc_core decrypts and parses the message
# We need to extract the relevant info
payload = pkt.payload if hasattr(pkt, 'payload') else b''
# Get source hash from payload
src_hash = payload[1] if len(payload) > 1 else 0
event_payload = {
'text': '', # Will need decryption via handler
'pubkey_prefix': f'{src_hash:02x}',
'SNR': snr,
'RSSI': rssi,
'path_len': 0,
'raw_hex': packet.hex()
'raw_hex': payload.hex() if payload else ''
}
# Emit as DM or channel message based on route type
# For now, emit as contact message
await self._emit_event(EventType.CONTACT_MSG_RECV, payload)
self.logger.info(f"Text message from {event_payload['pubkey_prefix']}")
# Emit contact message event
await self._emit_event(EventType.CONTACT_MSG_RECV, event_payload)
except Exception as e:
self.logger.error(f"Error handling text message: {e}")
async def _handle_advertisement(self, packet: bytes, snr: float, rssi: float) -> None:
"""Handle incoming advertisement packet."""
async def _handle_group_text_packet(self, pkt, snr: float, rssi: float) -> None:
"""Handle group/channel text message packet from dispatcher."""
try:
# Parse advertisement packet
# Advertisements contain node identity, location, flags
payload = {
'public_key': '',
'name': '',
'latitude': None,
'longitude': None,
'flags': 0,
payload = pkt.payload if hasattr(pkt, 'payload') else b''
# The GroupTextHandler in pymc_core stores decrypted data in pkt.decrypted
group_data = getattr(pkt, 'decrypted', {}).get('group_text_data', {})
text = group_data.get('text', '')
channel_name = group_data.get('channel_name', '')
sender_name = group_data.get('sender_name', 'Unknown')
channel_hash = group_data.get('channel_hash', 0)
# If group_data is empty, the message wasn't decrypted (missing channel key)
if not group_data:
self.logger.warning(f"Channel message not decrypted - missing channel key? hash=0x{payload[0]:02X}")
event_payload = {
'text': text,
'channel': channel_name,
'channel_name': channel_name,
'channel_idx': channel_hash, # Use hash as index for compatibility
'sender': sender_name,
'sender_name': sender_name,
'SNR': snr,
'RSSI': rssi,
'raw_hex': packet.hex()
'raw_hex': payload.hex() if payload else ''
}
# Emit new contact event
await self._emit_event(EventType.NEW_CONTACT, payload)
await self._emit_event(EventType.ADVERT_RECV, payload)
self.logger.info(f"Channel message on [{channel_name}] from {sender_name}: {text[:50]}...")
# Emit channel message event
await self._emit_event(EventType.CHANNEL_MSG_RECV, event_payload)
except Exception as e:
self.logger.error(f"Error handling advertisement: {e}")
self.logger.error(f"Error handling group text: {e}")
class PyMCCommands:
@@ -624,31 +960,51 @@ class PyMCCommands:
contacts = list(self.connection._contacts.values())
return Event(type=EventType.OK, payload={'contacts': contacts})
async def add_contact(self, public_key: str, name: str) -> Event:
async def add_contact(self, public_key_or_data, name: str = None) -> Event:
"""
Add a contact to the database.
Args:
public_key: Contact's public key
name: Contact's name
Supports multiple calling conventions:
add_contact(contact_data_dict)
add_contact(public_key, name)
add_contact(name, public_key)
Returns:
Event indicating success or failure
"""
try:
# Add to database via repeater manager
if hasattr(self.connection.bot, 'repeater_manager'):
# This will be tracked via advertisement handling
pass
# Handle dict argument (contact_data)
if isinstance(public_key_or_data, dict):
contact_data = public_key_or_data
public_key = contact_data.get('public_key', '')
name = contact_data.get('name', contact_data.get('adv_name', ''))
elif name is None:
# Single string argument - assume it's a name
name = public_key_or_data
public_key = ''
else:
# Two arguments - could be (key, name) or (name, key)
if len(public_key_or_data) == 64:
# Looks like a public key (64 hex chars)
public_key = public_key_or_data
else:
# Assume first is name, second is key
public_key = name
name = public_key_or_data
if not name:
name = f"Contact_{public_key[:8]}" if public_key else "Unknown"
# Add to local cache
contact = ContactInfo(public_key=public_key, name=name)
key = public_key[:12]
key = public_key[:12] if public_key else name
self.connection._contacts[key] = contact
self.logger.info(f"Added contact: {name} ({public_key[:12] if public_key else 'no key'})")
return Event(type=EventType.OK, payload={'added': True})
except Exception as e:
self.logger.error(f"Error adding contact: {e}")
return Event(type=EventType.ERROR, payload={'error': str(e)})
async def remove_contact(self, public_key: str) -> Event:
@@ -667,6 +1023,87 @@ class PyMCCommands:
del self.connection._contacts[key]
return Event(type=EventType.OK, payload={'removed': True})
except Exception as e:
return Event(type=EventType.ERROR, payload={'error': str(e)})
async def get_channel(self, channel_idx: int) -> Event:
"""
Get channel information from database.
Args:
channel_idx: Channel index
Returns:
Event with channel info
"""
try:
if not hasattr(self.connection.bot, 'db_manager'):
return Event(type=EventType.ERROR, payload={'error': 'Database not available'})
query = """
SELECT channel_idx, channel_name, channel_type, channel_key_hex
FROM channels
WHERE channel_idx = ?
"""
rows = self.connection.bot.db_manager.execute_query(query, (channel_idx,))
if rows:
row = rows[0]
return Event(type=EventType.OK, payload={
'channel_idx': row.get('channel_idx'),
'channel_name': row.get('channel_name', ''),
'channel_type': row.get('channel_type', ''),
'channel_key_hex': row.get('channel_key_hex', ''),
'channel_secret': bytes.fromhex(row.get('channel_key_hex', '')) if row.get('channel_key_hex') else b''
})
else:
return Event(type=EventType.ERROR, payload={'error': f'Channel {channel_idx} not found'})
except Exception as e:
self.logger.error(f"Error getting channel: {e}")
return Event(type=EventType.ERROR, payload={'error': str(e)})
async def set_channel(self, channel_idx: int, channel_name: str, channel_secret) -> Event:
"""
Set/create a channel in the database.
Args:
channel_idx: Channel index
channel_name: Channel name
channel_secret: Channel key (bytes or hex string)
Returns:
Event indicating success or failure
"""
try:
if not hasattr(self.connection.bot, 'db_manager'):
return Event(type=EventType.ERROR, payload={'error': 'Database not available'})
# Convert secret to hex string if needed
if isinstance(channel_secret, bytes):
channel_key_hex = channel_secret.hex()
else:
channel_key_hex = str(channel_secret)
# Upsert channel in database
query = """
INSERT OR REPLACE INTO channels (channel_idx, channel_name, channel_key_hex, last_updated)
VALUES (?, ?, ?, CURRENT_TIMESTAMP)
"""
self.connection.bot.db_manager.execute_query(query, (channel_idx, channel_name, channel_key_hex))
# Invalidate channel cache
if hasattr(self.connection, '_channel_db'):
self.connection._channel_db._cache_time = 0
self.logger.info(f"Set channel {channel_idx}: {channel_name}")
return Event(type=EventType.OK, payload={
'channel_idx': channel_idx,
'channel_name': channel_name,
'channel_key_hex': channel_key_hex
})
except Exception as e:
self.logger.error(f"Error setting channel: {e}")
return Event(type=EventType.ERROR, payload={'error': str(e)})