From 80b6bcceead288cdba431a2ea674ccae5631f72a Mon Sep 17 00:00:00 2001 From: agessaman Date: Thu, 1 Jan 2026 12:16:22 -0800 Subject: [PATCH] Refactor database connection handling in web viewer and improve error logging; fix keyword reporting to web viewer - Simplified database connection management by using context managers to ensure connections are properly closed. - Enhanced error handling during MQTT client disconnection in packet capture service, logging specific exceptions. - Updated message handling in MessageHandler to capture command data for web viewer integration, improving response tracking. --- modules/message_handler.py | 18 +- .../service_plugins/packet_capture_service.py | 8 +- modules/web_viewer/app.py | 210 ++++++++---------- 3 files changed, 110 insertions(+), 126 deletions(-) diff --git a/modules/message_handler.py b/modules/message_handler.py index adf2573..a0775ed 100644 --- a/modules/message_handler.py +++ b/modules/message_handler.py @@ -1757,14 +1757,22 @@ class MessageHandler: # response is not None here, so we know a response will be sent stats_command.record_command(message, keyword, True) - # Note: Command data capture is handled in command_manager.py after execution - # to avoid duplicate messages to web viewer - # Send response if message.is_dm: - await self.bot.command_manager.send_dm(message.sender_id, response) + success = await self.bot.command_manager.send_dm(message.sender_id, response) else: - await self.bot.command_manager.send_channel_message(message.channel, response) + success = await self.bot.command_manager.send_channel_message(message.channel, response) + + # Capture keyword command data for web viewer + if (hasattr(self.bot, 'web_viewer_integration') and + self.bot.web_viewer_integration and + self.bot.web_viewer_integration.bot_integration): + try: + self.bot.web_viewer_integration.bot_integration.capture_command( + message, keyword, response, success + ) + except Exception as e: + self.logger.debug(f"Failed to capture keyword data for web viewer: {e}") # Only execute commands if no help response was sent and no plugin command with response was matched # Help responses and plugin commands with responses should be the final response for that message diff --git a/modules/service_plugins/packet_capture_service.py b/modules/service_plugins/packet_capture_service.py index 4877439..008eb3b 100644 --- a/modules/service_plugins/packet_capture_service.py +++ b/modules/service_plugins/packet_capture_service.py @@ -331,8 +331,12 @@ class PacketCaptureService(BaseServicePlugin): try: mqtt_client_info['client'].disconnect() mqtt_client_info['client'].loop_stop() - except: - pass + except (AttributeError, RuntimeError, OSError) as e: + # Silently ignore expected errors during cleanup (client already disconnected, etc.) + self.logger.debug(f"Error disconnecting MQTT client during cleanup: {e}") + except Exception as e: + # Log unexpected errors but don't fail cleanup + self.logger.warning(f"Unexpected error disconnecting MQTT client: {e}") # Close output file if self.output_handle: diff --git a/modules/web_viewer/app.py b/modules/web_viewer/app.py index 2a351b4..bcb8b7e 100644 --- a/modules/web_viewer/app.py +++ b/modules/web_viewer/app.py @@ -187,44 +187,38 @@ class BotDataViewer: db_path = resolve_path(db_path, self.bot_root) # Connect to database and create table if it doesn't exist - conn = sqlite3.connect(db_path, timeout=30) - cursor = conn.cursor() - - # Create packet_stream table with schema matching the INSERT statements - cursor.execute(''' - CREATE TABLE IF NOT EXISTS packet_stream ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - timestamp REAL NOT NULL, - data TEXT NOT NULL, - type TEXT NOT NULL - ) - ''') - - # Create index on timestamp for faster queries - cursor.execute(''' - CREATE INDEX IF NOT EXISTS idx_packet_stream_timestamp - ON packet_stream(timestamp) - ''') - - # Create index on type for filtering by type - cursor.execute(''' - CREATE INDEX IF NOT EXISTS idx_packet_stream_type - ON packet_stream(type) - ''') - - conn.commit() - - self.logger.info(f"Initialized packet_stream table in {db_path}") + with sqlite3.connect(db_path, timeout=30) as conn: + cursor = conn.cursor() + + # Create packet_stream table with schema matching the INSERT statements + cursor.execute(''' + CREATE TABLE IF NOT EXISTS packet_stream ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp REAL NOT NULL, + data TEXT NOT NULL, + type TEXT NOT NULL + ) + ''') + + # Create index on timestamp for faster queries + cursor.execute(''' + CREATE INDEX IF NOT EXISTS idx_packet_stream_timestamp + ON packet_stream(timestamp) + ''') + + # Create index on type for filtering by type + cursor.execute(''' + CREATE INDEX IF NOT EXISTS idx_packet_stream_type + ON packet_stream(type) + ''') + + conn.commit() + + self.logger.info(f"Initialized packet_stream table in {db_path}") except Exception as e: self.logger.error(f"Failed to initialize packet_stream table: {e}") # Don't raise - allow web viewer to continue even if table init fails - finally: - if conn: - try: - conn.close() - except Exception as e: - self.logger.debug(f"Error closing init connection: {e}") def _get_db_connection(self): """Get database connection - create new connection for each request to avoid threading issues""" @@ -384,7 +378,6 @@ class BotDataViewer: @self.app.route('/api/recent_commands') def api_recent_commands(): """API endpoint to get recent commands from database""" - conn = None try: import sqlite3 import json @@ -399,38 +392,32 @@ class BotDataViewer: # Resolve database path (relative paths resolved from bot root, absolute paths used as-is) db_path = resolve_path(db_path, self.bot_root) - conn = sqlite3.connect(db_path, timeout=30) - cursor = conn.cursor() - - cursor.execute(''' - SELECT data FROM packet_stream - WHERE type = 'command' AND timestamp > ? - ORDER BY timestamp DESC - LIMIT 100 - ''', (cutoff_time,)) - - rows = cursor.fetchall() - - # Parse and return commands - commands = [] - for (data_json,) in rows: - try: - command_data = json.loads(data_json) - commands.append(command_data) - except Exception as e: - self.logger.debug(f"Error parsing command data: {e}") - - return jsonify({'commands': commands}) + with sqlite3.connect(db_path, timeout=30) as conn: + cursor = conn.cursor() + + cursor.execute(''' + SELECT data FROM packet_stream + WHERE type = 'command' AND timestamp > ? + ORDER BY timestamp DESC + LIMIT 100 + ''', (cutoff_time,)) + + rows = cursor.fetchall() + + # Parse and return commands + commands = [] + for (data_json,) in rows: + try: + command_data = json.loads(data_json) + commands.append(command_data) + except Exception as e: + self.logger.debug(f"Error parsing command data: {e}") + + return jsonify({'commands': commands}) except Exception as e: self.logger.error(f"Error getting recent commands: {e}") return jsonify({'error': str(e)}), 500 - finally: - if conn: - try: - conn.close() - except Exception as e: - self.logger.debug(f"Error closing recent_commands connection: {e}") @self.app.route('/api/geocode-contact', methods=['POST']) def api_geocode_contact(): @@ -1415,7 +1402,6 @@ class BotDataViewer: max_consecutive_errors = 10 while True: - conn = None try: import time import sqlite3 @@ -1428,37 +1414,37 @@ class BotDataViewer: db_path = resolve_path(db_path, self.bot_root) # Connect to database with timeout to prevent hanging - conn = sqlite3.connect(db_path, timeout=30) - cursor = conn.cursor() - - # Get new data since last poll - cursor.execute(''' - SELECT timestamp, data, type FROM packet_stream - WHERE timestamp > ? - ORDER BY timestamp ASC - ''', (last_timestamp,)) - - rows = cursor.fetchall() - - # Process new data - for timestamp, data_json, data_type in rows: - try: - data = json.loads(data_json) - - # Broadcast based on type - if data_type == 'command': - self._handle_command_data(data) - elif data_type == 'packet': - self._handle_packet_data(data) - elif data_type == 'routing': - self._handle_packet_data(data) # Treat routing as packet data + with sqlite3.connect(db_path, timeout=30) as conn: + cursor = conn.cursor() + + # Get new data since last poll + cursor.execute(''' + SELECT timestamp, data, type FROM packet_stream + WHERE timestamp > ? + ORDER BY timestamp ASC + ''', (last_timestamp,)) + + rows = cursor.fetchall() + + # Process new data + for timestamp, data_json, data_type in rows: + try: + data = json.loads(data_json) - except Exception as e: - self.logger.warning(f"Error processing database data: {e}") - - # Update last timestamp - if rows: - last_timestamp = rows[-1][0] + # Broadcast based on type + if data_type == 'command': + self._handle_command_data(data) + elif data_type == 'packet': + self._handle_packet_data(data) + elif data_type == 'routing': + self._handle_packet_data(data) # Treat routing as packet data + + except Exception as e: + self.logger.warning(f"Error processing database data: {e}") + + # Update last timestamp + if rows: + last_timestamp = rows[-1][0] # Reset error counter on success consecutive_errors = 0 @@ -1491,13 +1477,6 @@ class BotDataViewer: self.logger.warning(f"Database polling unexpected error (attempt {consecutive_errors}): {e}") time.sleep(2) - finally: - # Always close connection, even on error - if conn: - try: - conn.close() - except Exception as e: - self.logger.debug(f"Error closing database connection: {e}") # Start polling thread polling_thread = threading.Thread(target=poll_database, daemon=True) @@ -1529,7 +1508,6 @@ class BotDataViewer: def _cleanup_old_data(self, days_to_keep: int = 7): """Clean up old packet stream data to prevent database bloat""" - conn = None try: import sqlite3 import time @@ -1543,26 +1521,20 @@ class BotDataViewer: db_path = resolve_path(db_path, self.bot_root) # Use timeout to prevent hanging - conn = sqlite3.connect(db_path, timeout=30) - cursor = conn.cursor() - - # Clean up old packet stream data - cursor.execute('DELETE FROM packet_stream WHERE timestamp < ?', (cutoff_time,)) - deleted_count = cursor.rowcount - - conn.commit() - - if deleted_count > 0: - self.logger.info(f"Cleaned up {deleted_count} old packet stream entries (older than {days_to_keep} days)") + with sqlite3.connect(db_path, timeout=30) as conn: + cursor = conn.cursor() + + # Clean up old packet stream data + cursor.execute('DELETE FROM packet_stream WHERE timestamp < ?', (cutoff_time,)) + deleted_count = cursor.rowcount + + conn.commit() + + if deleted_count > 0: + self.logger.info(f"Cleaned up {deleted_count} old packet stream entries (older than {days_to_keep} days)") except Exception as e: self.logger.error(f"Error cleaning up old packet stream data: {e}") - finally: - if conn: - try: - conn.close() - except Exception as e: - self.logger.debug(f"Error closing cleanup connection: {e}") def _get_database_stats(self, top_users_window='all', top_commands_window='all', top_paths_window='all', top_channels_window='all'):