Files
meshcore-bot/modules/channel_manager.py
agessaman 217d2a4089 Refactor database connection handling across multiple modules
- Replaced direct SQLite connection calls with a context manager in various modules to ensure proper resource management and prevent file descriptor leaks.
- Introduced a new `connection` method in `DBManager` to standardize connection handling.
- Updated all relevant database interactions in modules such as `feed_manager`, `scheduler`, `commands`, and others to utilize the new connection method.
- Improved code readability and maintainability by consolidating connection logic.
2026-03-01 14:12:22 -08:00

725 lines
33 KiB
Python

#!/usr/bin/env python3
"""
Channel management functionality for the MeshCore Bot
Handles efficient concurrent channel fetching with caching
"""
import asyncio
import sys
import os
import hashlib
import copy
from typing import Dict, Any, List, Optional
from meshcore import EventType
class ChannelManager:
"""Manages channel operations and information with enhanced concurrent fetching"""
def __init__(self, bot, max_channels: int = 40):
"""
Initialize the channel manager
Args:
bot: The MeshCore bot instance
max_channels: Maximum number of channels to fetch (default 40)
"""
self.bot = bot
self.logger = bot.logger
self.max_channels = max_channels
self._channels_cache: Dict[int, Dict[str, Any]] = {}
self._cache_valid = False
self._fetch_timeout = 2.0 # Timeout for individual channel fetches
async def fetch_channels(self):
"""Fetch channels from the MeshCore node using enhanced concurrent fetching"""
self.logger.info("Fetching channels from MeshCore node using enhanced concurrent method...")
try:
# Wait a moment for the device to be ready
await asyncio.sleep(2)
# Fetch all channels concurrently
channels = await self.fetch_all_channels(force_refresh=True)
if channels:
self.logger.info(f"Successfully fetched {len(channels)} channels from MeshCore node")
for channel in channels:
channel_name = channel.get('channel_name', f'Channel{channel.get("channel_idx", "?")}')
channel_idx = channel.get('channel_idx', '?')
if channel_name: # Only log non-empty channel names
self.logger.info(f" Channel {channel_idx}: {channel_name}")
else:
self.logger.debug(f" Channel {channel_idx}: (empty)")
else:
self.logger.warning("No channels found on MeshCore node")
self.bot.meshcore.channels = {}
except Exception as e:
self.logger.error(f"Failed to fetch channels: {e}")
self.bot.meshcore.channels = {}
async def fetch_all_channels(self, force_refresh: bool = False) -> List[Dict[str, Any]]:
"""
Fetch all channels efficiently using optimized sequential requests
Args:
force_refresh: If True, bypass cache and fetch fresh data
Returns:
List of channel dictionaries with channel info
"""
if not force_refresh and self._cache_valid:
return self._get_cached_channels()
self.logger.info(f"Fetching all channels (0-{self.max_channels-1}) with optimized sequential method...")
# Check if device is connected before attempting fetch
if not hasattr(self.bot, 'connected') or not self.bot.connected:
self.logger.warning("Device not connected, skipping channel fetch")
return []
# Clear cache for fresh fetch
self._channels_cache.clear()
valid_channels = []
consecutive_timeouts = 0
max_consecutive_timeouts = 3 # Abort if first 3 channels all timeout
# Fetch channels sequentially but with optimized logic
for channel_idx in range(self.max_channels):
try:
result = await self._fetch_single_channel(channel_idx)
if result and result.get("channel_name"):
self._channels_cache[channel_idx] = result
valid_channels.append(result)
consecutive_timeouts = 0 # Reset timeout counter on success
self.logger.debug(f"Found channel {channel_idx}: {result.get('channel_name')}")
elif result and not result.get("channel_name"):
# Empty channel - log but don't stop
consecutive_timeouts = 0 # Reset timeout counter (device responded)
self.logger.debug(f"Channel {channel_idx} is empty")
else:
# No response - channel doesn't exist
consecutive_timeouts += 1
self.logger.debug(f"Channel {channel_idx} not found")
# If first few channels all timeout, device is likely unresponsive
if consecutive_timeouts >= max_consecutive_timeouts and channel_idx < max_consecutive_timeouts:
self.logger.warning(f"First {max_consecutive_timeouts} channels all timed out - device may be unresponsive, aborting fetch")
break
# Small delay between requests to avoid overwhelming the device
await asyncio.sleep(0.1)
except Exception as e:
consecutive_timeouts += 1
self.logger.debug(f"Error fetching channel {channel_idx}: {e}")
# Abort early if device appears unresponsive
if consecutive_timeouts >= max_consecutive_timeouts and channel_idx < max_consecutive_timeouts:
self.logger.warning(f"Multiple consecutive errors fetching channels - device may be unresponsive, aborting fetch")
break
continue
self._cache_valid = True
self.logger.info(f"Successfully fetched {len(valid_channels)} channels")
# Update the bot's meshcore channels for compatibility
self.bot.meshcore.channels = self._channels_cache
# Store channels in database for web viewer access
self._store_channels_in_db(valid_channels)
return valid_channels
def _store_channels_in_db(self, channels: List[Dict[str, Any]]):
"""Store channel information in database for web viewer access (full refresh - clears all first)"""
try:
with self.bot.db_manager.connection() as conn:
cursor = conn.cursor()
# Clear existing channels (full refresh)
cursor.execute('DELETE FROM channels')
# Insert all channels
for channel in channels:
self._insert_channel_in_db(cursor, channel)
conn.commit()
self.logger.debug(f"Stored {len(channels)} channels in database (full refresh)")
except Exception as e:
self.logger.warning(f"Failed to store channels in database: {e}")
def _store_single_channel_in_db(self, channel: Dict[str, Any]):
"""Store or update a single channel in database (without clearing others)"""
try:
with self.bot.db_manager.connection() as conn:
cursor = conn.cursor()
self._insert_channel_in_db(cursor, channel)
conn.commit()
self.logger.debug(f"Stored/updated channel {channel.get('channel_idx')} in database")
except Exception as e:
self.logger.warning(f"Failed to store single channel in database: {e}")
def _insert_channel_in_db(self, cursor, channel: Dict[str, Any]):
"""Helper method to insert/update a single channel in database"""
channel_idx = channel.get('channel_idx')
channel_name = channel.get('channel_name', '')
channel_key_hex = channel.get('channel_key_hex', '')
# Determine channel type based on key derivation
# If key matches hashtag derivation, it's a hashtag channel
channel_type = 'hashtag' # Default assumption
if channel_name and channel_key_hex:
# Check if key matches hashtag derivation
expected_key = self.generate_hashtag_key(channel_name)
if expected_key.hex() == channel_key_hex:
channel_type = 'hashtag'
else:
channel_type = 'custom'
if channel_name: # Only store non-empty channels
cursor.execute('''
INSERT OR REPLACE INTO channels
(channel_idx, channel_name, channel_type, channel_key_hex, last_updated)
VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)
''', (channel_idx, channel_name, channel_type, channel_key_hex))
async def _fetch_single_channel(self, channel_idx: int) -> Optional[Dict[str, Any]]:
"""
Fetch a single channel with error handling
Args:
channel_idx: The channel index to fetch
Returns:
Channel info dictionary or None if not configured
"""
try:
# Create a future to capture the channel info event
channel_event = None
event_received = asyncio.Event()
async def on_channel_info(event):
nonlocal channel_event
if event.payload.get('channel_idx') == channel_idx:
channel_event = event
event_received.set()
# Subscribe to channel info events
subscription = self.bot.meshcore.subscribe(EventType.CHANNEL_INFO, on_channel_info)
try:
# Send the command (suppress raw JSON output)
from meshcore_cli.meshcore_cli import next_cmd
with open(os.devnull, 'w') as devnull:
old_stdout = sys.stdout
sys.stdout = devnull
try:
await next_cmd(self.bot.meshcore, ["get_channel", str(channel_idx)])
finally:
sys.stdout = old_stdout
# Wait for the event with timeout
try:
await asyncio.wait_for(event_received.wait(), timeout=self._fetch_timeout)
except asyncio.TimeoutError:
self.logger.debug(f"Timeout waiting for channel {channel_idx} response")
return None
# Check if we got the channel info
if channel_event and channel_event.payload:
payload = channel_event.payload
# Store channel key for decryption
channel_secret = payload.get('channel_secret', b'')
if isinstance(channel_secret, bytes) and len(channel_secret) == 16:
# Convert to hex for easier handling
payload['channel_key_hex'] = channel_secret.hex()
# Check if this is an empty channel (all-zero channel secret)
if isinstance(channel_secret, bytes) and channel_secret == b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00':
self.logger.debug(f"Channel {channel_idx} is empty (all-zero secret)")
return None
return payload
else:
self.logger.debug(f"No channel {channel_idx} found")
return None
finally:
# Unsubscribe
self.bot.meshcore.unsubscribe(subscription)
except asyncio.TimeoutError:
self.logger.debug(f"Timeout fetching channel {channel_idx}")
return None
except Exception as e:
self.logger.debug(f"Error fetching channel {channel_idx}: {e}")
return None
def _get_cached_channels(self) -> List[Dict[str, Any]]:
"""Get channels from cache, sorted by index"""
return [
self._channels_cache[idx]
for idx in sorted(self._channels_cache.keys())
]
async def get_channel(self, channel_idx: int, use_cache: bool = True) -> Optional[Dict[str, Any]]:
"""
Get a specific channel, optionally from cache
Args:
channel_idx: The channel index
use_cache: If True, return from cache if available
Returns:
Channel info dictionary or None
"""
if use_cache and channel_idx in self._channels_cache:
return self._channels_cache[channel_idx]
result = await self._fetch_single_channel(channel_idx)
if result:
self._channels_cache[channel_idx] = result
return result
def get_channel_name(self, channel_num: int) -> str:
"""Get channel name from channel number"""
if channel_num in self._channels_cache:
channel_info = self._channels_cache[channel_num]
return channel_info.get('channel_name', f"Channel{channel_num}")
else:
self.logger.warning(f"Channel {channel_num} not found in cached channels")
return f"Channel{channel_num}"
def get_channel_number(self, channel_name: str) -> Optional[int]:
"""
Get channel number from channel name
Args:
channel_name: The channel name to look up
Returns:
Channel number if found, None if not found (to distinguish from channel 0)
"""
for num, channel_info in self._channels_cache.items():
if channel_info.get('channel_name', '').lower() == channel_name.lower():
return num
self.logger.warning(f"Channel name '{channel_name}' not found in cached channels")
return None
def get_channel_key(self, channel_num: int) -> str:
"""Get channel encryption key from channel number"""
if channel_num in self._channels_cache:
channel_info = self._channels_cache[channel_num]
return channel_info.get('channel_key_hex', '')
return ''
def get_channel_info(self, channel_num: int) -> dict:
"""Get complete channel information including name and key"""
if channel_num in self._channels_cache:
channel_info = self._channels_cache[channel_num]
return {
'name': self.get_channel_name(channel_num),
'key': self.get_channel_key(channel_num),
'info': channel_info
}
return {'name': f"Channel{channel_num}", 'key': '', 'info': {}}
def get_channel_by_name(self, name: str) -> Optional[Dict[str, Any]]:
"""
Find a channel by name from cache
Args:
name: The channel name to search for
Returns:
Channel info dictionary or None
"""
if not self._cache_valid:
self.logger.warning("Cache not valid, call fetch_all_channels() first")
return None
name_lower = name.lower()
for channel in self._channels_cache.values():
if channel.get("channel_name", "").lower() == name_lower:
return channel
return None
def get_configured_channels(self) -> List[Dict[str, Any]]:
"""
Get only configured channels from cache
Returns:
List of configured channels
"""
if not self._cache_valid:
self.logger.warning("Cache not valid, call fetch_all_channels() first")
return []
return [
ch for ch in self._channels_cache.values()
if ch.get("channel_name") and ch["channel_name"].strip()
]
def invalidate_cache(self):
"""Invalidate the channels cache"""
self._cache_valid = False
self.logger.debug("Channels cache invalidated")
@staticmethod
def generate_hashtag_key(channel_name: str) -> bytes:
"""
Generate a hashtag channel key from the channel name
The key is the first 16 bytes of the SHA256 hash of the channel name
(including the # symbol), converted to lowercase.
Args:
channel_name: The channel name (e.g., "#general" or "general")
Returns:
16-byte key for the hashtag channel
"""
# Ensure channel name starts with # and is lowercase
if not channel_name.startswith('#'):
channel_name = '#' + channel_name
channel_name_lower = channel_name.lower()
# Compute SHA256 hash
hash_obj = hashlib.sha256(channel_name_lower.encode('utf-8'))
hash_bytes = hash_obj.digest()
# Take first 16 bytes
return hash_bytes[:16]
async def add_hashtag_channel(self, channel_idx: int, channel_name: str) -> bool:
"""
Add or update a hashtag channel on the radio
Hashtag channels use publicly derivable keys based on the channel name.
The firmware automatically generates the key when the channel name starts with #.
Args:
channel_idx: The channel index (0-39)
channel_name: The name of the channel (with or without # prefix)
Returns:
True if successful, False otherwise
"""
# Ensure channel name has # prefix for consistency
if not channel_name.startswith('#'):
channel_name = '#' + channel_name
self.logger.info(f"Adding hashtag channel {channel_idx}: {channel_name}")
# Use the simplified add_channel method - firmware will auto-generate key
return await self.add_channel(channel_idx, channel_name)
async def add_channel(self, channel_idx: int, channel_name: str, channel_secret: Optional[bytes] = None, channel_secret_hex: Optional[str] = None) -> bool:
"""
Add or update a channel on the radio
For hashtag channels (name starts with #), the firmware automatically generates the key.
For custom channels, provide either channel_secret (bytes) or channel_secret_hex (hex string).
Args:
channel_idx: The channel index (0-39)
channel_name: The name of the channel
channel_secret: Optional 16-byte encryption key for custom channels
channel_secret_hex: Optional hex string (32 chars) for the encryption key. Takes precedence over channel_secret.
Returns:
True if successful, False otherwise
"""
if not self.bot.connected or not self.bot.meshcore:
self.logger.error("Not connected to MeshCore node")
return False
if channel_idx < 0 or channel_idx >= self.max_channels:
self.logger.error(f"Channel index {channel_idx} out of range (0-{self.max_channels-1})")
return False
try:
# Check if this is a hashtag channel (firmware auto-generates key)
is_hashtag = channel_name.startswith('#')
# For custom channels, validate and prepare the key
if not is_hashtag:
if channel_secret_hex:
# Validate hex string
if len(channel_secret_hex) != 32:
self.logger.error(f"Channel secret hex must be exactly 32 characters (16 bytes), got {len(channel_secret_hex)}")
return False
try:
channel_secret = bytes.fromhex(channel_secret_hex)
except ValueError as e:
self.logger.error(f"Invalid hex string for channel secret: {e}")
return False
elif channel_secret is None:
self.logger.error("Custom channel requires a channel key (channel_secret or channel_secret_hex)")
return False
elif len(channel_secret) != 16:
self.logger.error(f"Channel secret must be exactly 16 bytes, got {len(channel_secret)}")
return False
self.logger.info(f"Adding custom channel {channel_idx}: {channel_name} (key: {channel_secret.hex()[:8]}...)")
else:
self.logger.info(f"Adding hashtag channel {channel_idx}: {channel_name} (firmware will auto-generate key)")
# Use meshcore.commands.set_channel API directly
if hasattr(self.bot.meshcore, 'commands') and hasattr(self.bot.meshcore.commands, 'set_channel'):
# For hashtag channels, just pass the name (firmware generates key)
if is_hashtag:
res = await self.bot.meshcore.commands.set_channel(channel_idx, channel_name)
else:
# For custom channels, we need to pass the key
# Check if set_channel accepts a key parameter
# Try with key as third parameter
try:
res = await self.bot.meshcore.commands.set_channel(channel_idx, channel_name, channel_secret)
except TypeError:
# If that doesn't work, try with hex string
try:
res = await self.bot.meshcore.commands.set_channel(channel_idx, channel_name, channel_secret_hex or channel_secret.hex())
except TypeError:
# Fallback to CLI method if API doesn't support key parameter
self.logger.warning("meshcore.commands.set_channel doesn't accept key parameter, using CLI fallback")
return await self._add_channel_via_cli(channel_idx, channel_name, channel_secret.hex() if channel_secret else channel_secret_hex)
# Check for errors
if hasattr(res, 'type') and res.type == EventType.ERROR:
self.logger.error(f"Failed to set channel {channel_idx}: {res.payload if hasattr(res, 'payload') else 'Unknown error'}")
return False
# Fetch the channel back to get the generated key and verify
res = await self.bot.meshcore.commands.get_channel(channel_idx)
if hasattr(res, 'type') and res.type == EventType.ERROR:
self.logger.error(f"Failed to get channel {channel_idx} after setting: {res.payload if hasattr(res, 'payload') else 'Unknown error'}")
return False
# Extract channel info from response
if hasattr(res, 'payload'):
channel_info = res.payload
else:
# Fallback: try to get from event subscription
channel_info = await self._fetch_single_channel(channel_idx)
if not channel_info:
self.logger.error(f"Could not retrieve channel {channel_idx} after setting")
return False
# Verify channel was set correctly
if channel_info.get('channel_name') != channel_name:
self.logger.error(f"Channel name mismatch: expected {channel_name}, got {channel_info.get('channel_name')}")
return False
# For custom channels, verify the key matches
if not is_hashtag:
channel_secret_from_device = channel_info.get('channel_secret', b'')
if isinstance(channel_secret_from_device, bytes) and channel_secret_from_device != channel_secret:
self.logger.error(f"Channel key mismatch for custom channel {channel_idx}")
return False
# Update cache and database
channel_info['channel_key_hex'] = channel_info.get('channel_secret', b'').hex() if isinstance(channel_info.get('channel_secret'), bytes) else ''
self._channels_cache[channel_idx] = channel_info
self._store_single_channel_in_db(channel_info)
self.logger.info(f"Successfully added channel {channel_idx}: {channel_name}")
return True
else:
# Fallback to CLI method if commands API not available
self.logger.warning("meshcore.commands.set_channel not available, using CLI fallback")
channel_secret_hex = channel_secret.hex() if channel_secret else channel_secret_hex
if is_hashtag:
# For hashtag, generate key ourselves as fallback
channel_secret = self.generate_hashtag_key(channel_name)
channel_secret_hex = channel_secret.hex()
return await self._add_channel_via_cli(channel_idx, channel_name, channel_secret_hex)
except Exception as e:
self.logger.error(f"Error adding channel {channel_idx}: {e}")
import traceback
self.logger.debug(traceback.format_exc())
return False
async def _add_channel_via_cli(self, channel_idx: int, channel_name: str, channel_secret_hex: str) -> bool:
"""
Fallback method to add channel using CLI wrapper (for older meshcore versions)
Args:
channel_idx: The channel index
channel_name: The channel name
channel_secret_hex: The channel key as hex string
Returns:
True if successful, False otherwise
"""
try:
# Subscribe to channel info events to confirm the channel was set
channel_set = False
event_received = asyncio.Event()
async def on_channel_info(event):
nonlocal channel_set
# Copy payload immediately to avoid segfault if event is freed
payload = copy.deepcopy(event.payload) if hasattr(event, 'payload') else None
if payload and payload.get('channel_idx') == channel_idx:
if payload.get('channel_name') == channel_name:
channel_set = True
event_received.set()
subscription = self.bot.meshcore.subscribe(EventType.CHANNEL_INFO, on_channel_info)
try:
from meshcore_cli.meshcore_cli import next_cmd
# Suppress raw JSON output
with open(os.devnull, 'w') as devnull:
old_stdout = sys.stdout
sys.stdout = devnull
try:
await next_cmd(
self.bot.meshcore,
["set_channel", str(channel_idx), channel_name, channel_secret_hex]
)
finally:
sys.stdout = old_stdout
# Wait for confirmation with timeout
try:
await asyncio.wait_for(event_received.wait(), timeout=self._fetch_timeout * 2)
except asyncio.TimeoutError:
self.logger.warning(f"Timeout waiting for channel {channel_idx} set confirmation")
await asyncio.sleep(0.5)
result = await self._fetch_single_channel(channel_idx)
if result and result.get('channel_name') == channel_name:
channel_set = True
if channel_set:
# Update cache
result = await self._fetch_single_channel(channel_idx)
if result:
self._channels_cache[channel_idx] = result
self._store_single_channel_in_db(result)
self.logger.info(f"Successfully added channel {channel_idx}: {channel_name}")
return True
else:
self.logger.warning(f"Channel {channel_idx} was set but could not be verified")
return False
else:
self.logger.error(f"Failed to set channel {channel_idx}")
return False
finally:
self.bot.meshcore.unsubscribe(subscription)
except Exception as e:
self.logger.error(f"Error in CLI fallback for channel {channel_idx}: {e}")
return False
async def remove_channel(self, channel_idx: int) -> bool:
"""
Remove a channel from the radio by clearing it
Args:
channel_idx: The channel index to remove
Returns:
True if successful, False otherwise
"""
if not self.bot.connected or not self.bot.meshcore:
self.logger.error("Not connected to MeshCore node")
return False
if channel_idx < 0 or channel_idx >= self.max_channels:
self.logger.error(f"Channel index {channel_idx} out of range (0-{self.max_channels-1})")
return False
try:
self.logger.info(f"Removing channel {channel_idx}")
# Create all-zero channel secret (16 bytes) to clear the channel
empty_secret = b'\x00' * 16
empty_secret_hex = empty_secret.hex()
# Subscribe to channel info events to confirm the channel was cleared
channel_cleared = False
event_received = asyncio.Event()
async def on_channel_info(event):
nonlocal channel_cleared
# Copy payload immediately to avoid segfault if event is freed
payload = copy.deepcopy(event.payload) if hasattr(event, 'payload') else None
if payload and payload.get('channel_idx') == channel_idx:
event_secret = payload.get('channel_secret', b'')
# Check if the channel was cleared (all zeros or empty name)
if isinstance(event_secret, bytes) and event_secret == empty_secret:
channel_cleared = True
event_received.set()
elif not payload.get('channel_name') or payload.get('channel_name') == '':
channel_cleared = True
event_received.set()
subscription = self.bot.meshcore.subscribe(EventType.CHANNEL_INFO, on_channel_info)
try:
from meshcore_cli.meshcore_cli import next_cmd
# Suppress raw JSON output
with open(os.devnull, 'w') as devnull:
old_stdout = sys.stdout
sys.stdout = devnull
try:
# Clear the channel by setting it with empty name and all-zero secret
# Format: set_channel <idx> "" <empty_secret_hex>
await next_cmd(
self.bot.meshcore,
["set_channel", str(channel_idx), "", empty_secret_hex]
)
finally:
sys.stdout = old_stdout
# Wait for confirmation with timeout
try:
await asyncio.wait_for(event_received.wait(), timeout=self._fetch_timeout * 2)
except asyncio.TimeoutError:
self.logger.warning(f"Timeout waiting for channel {channel_idx} removal confirmation")
# Still try to verify by fetching the channel
await asyncio.sleep(0.5)
result = await self._fetch_single_channel(channel_idx)
if not result or not result.get('channel_name'):
channel_cleared = True
if channel_cleared:
# Remove from cache
if channel_idx in self._channels_cache:
del self._channels_cache[channel_idx]
# Update database - remove the channel
try:
with self.bot.db_manager.connection() as conn:
cursor = conn.cursor()
cursor.execute('DELETE FROM channels WHERE channel_idx = ?', (channel_idx,))
conn.commit()
except Exception as e:
self.logger.warning(f"Failed to remove channel from database: {e}")
self.logger.info(f"Successfully removed channel {channel_idx}")
return True
else:
self.logger.error(f"Failed to remove channel {channel_idx}")
return False
finally:
# Unsubscribe
self.bot.meshcore.unsubscribe(subscription)
except Exception as e:
self.logger.error(f"Error removing channel {channel_idx}: {e}")
return False