mirror of
https://github.com/agessaman/meshcore-bot.git
synced 2026-04-26 10:58:04 +00:00
Refactor database connection handling in web viewer and improve error logging; fix keyword reporting to web viewer
- Simplified database connection management by using context managers to ensure connections are properly closed. - Enhanced error handling during MQTT client disconnection in packet capture service, logging specific exceptions. - Updated message handling in MessageHandler to capture command data for web viewer integration, improving response tracking.
This commit is contained in:
@@ -1757,14 +1757,22 @@ class MessageHandler:
|
||||
# response is not None here, so we know a response will be sent
|
||||
stats_command.record_command(message, keyword, True)
|
||||
|
||||
# Note: Command data capture is handled in command_manager.py after execution
|
||||
# to avoid duplicate messages to web viewer
|
||||
|
||||
# Send response
|
||||
if message.is_dm:
|
||||
await self.bot.command_manager.send_dm(message.sender_id, response)
|
||||
success = await self.bot.command_manager.send_dm(message.sender_id, response)
|
||||
else:
|
||||
await self.bot.command_manager.send_channel_message(message.channel, response)
|
||||
success = await self.bot.command_manager.send_channel_message(message.channel, response)
|
||||
|
||||
# Capture keyword command data for web viewer
|
||||
if (hasattr(self.bot, 'web_viewer_integration') and
|
||||
self.bot.web_viewer_integration and
|
||||
self.bot.web_viewer_integration.bot_integration):
|
||||
try:
|
||||
self.bot.web_viewer_integration.bot_integration.capture_command(
|
||||
message, keyword, response, success
|
||||
)
|
||||
except Exception as e:
|
||||
self.logger.debug(f"Failed to capture keyword data for web viewer: {e}")
|
||||
|
||||
# Only execute commands if no help response was sent and no plugin command with response was matched
|
||||
# Help responses and plugin commands with responses should be the final response for that message
|
||||
|
||||
@@ -331,8 +331,12 @@ class PacketCaptureService(BaseServicePlugin):
|
||||
try:
|
||||
mqtt_client_info['client'].disconnect()
|
||||
mqtt_client_info['client'].loop_stop()
|
||||
except:
|
||||
pass
|
||||
except (AttributeError, RuntimeError, OSError) as e:
|
||||
# Silently ignore expected errors during cleanup (client already disconnected, etc.)
|
||||
self.logger.debug(f"Error disconnecting MQTT client during cleanup: {e}")
|
||||
except Exception as e:
|
||||
# Log unexpected errors but don't fail cleanup
|
||||
self.logger.warning(f"Unexpected error disconnecting MQTT client: {e}")
|
||||
|
||||
# Close output file
|
||||
if self.output_handle:
|
||||
|
||||
@@ -187,44 +187,38 @@ class BotDataViewer:
|
||||
db_path = resolve_path(db_path, self.bot_root)
|
||||
|
||||
# Connect to database and create table if it doesn't exist
|
||||
conn = sqlite3.connect(db_path, timeout=30)
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Create packet_stream table with schema matching the INSERT statements
|
||||
cursor.execute('''
|
||||
CREATE TABLE IF NOT EXISTS packet_stream (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
timestamp REAL NOT NULL,
|
||||
data TEXT NOT NULL,
|
||||
type TEXT NOT NULL
|
||||
)
|
||||
''')
|
||||
|
||||
# Create index on timestamp for faster queries
|
||||
cursor.execute('''
|
||||
CREATE INDEX IF NOT EXISTS idx_packet_stream_timestamp
|
||||
ON packet_stream(timestamp)
|
||||
''')
|
||||
|
||||
# Create index on type for filtering by type
|
||||
cursor.execute('''
|
||||
CREATE INDEX IF NOT EXISTS idx_packet_stream_type
|
||||
ON packet_stream(type)
|
||||
''')
|
||||
|
||||
conn.commit()
|
||||
|
||||
self.logger.info(f"Initialized packet_stream table in {db_path}")
|
||||
with sqlite3.connect(db_path, timeout=30) as conn:
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Create packet_stream table with schema matching the INSERT statements
|
||||
cursor.execute('''
|
||||
CREATE TABLE IF NOT EXISTS packet_stream (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
timestamp REAL NOT NULL,
|
||||
data TEXT NOT NULL,
|
||||
type TEXT NOT NULL
|
||||
)
|
||||
''')
|
||||
|
||||
# Create index on timestamp for faster queries
|
||||
cursor.execute('''
|
||||
CREATE INDEX IF NOT EXISTS idx_packet_stream_timestamp
|
||||
ON packet_stream(timestamp)
|
||||
''')
|
||||
|
||||
# Create index on type for filtering by type
|
||||
cursor.execute('''
|
||||
CREATE INDEX IF NOT EXISTS idx_packet_stream_type
|
||||
ON packet_stream(type)
|
||||
''')
|
||||
|
||||
conn.commit()
|
||||
|
||||
self.logger.info(f"Initialized packet_stream table in {db_path}")
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Failed to initialize packet_stream table: {e}")
|
||||
# Don't raise - allow web viewer to continue even if table init fails
|
||||
finally:
|
||||
if conn:
|
||||
try:
|
||||
conn.close()
|
||||
except Exception as e:
|
||||
self.logger.debug(f"Error closing init connection: {e}")
|
||||
|
||||
def _get_db_connection(self):
|
||||
"""Get database connection - create new connection for each request to avoid threading issues"""
|
||||
@@ -384,7 +378,6 @@ class BotDataViewer:
|
||||
@self.app.route('/api/recent_commands')
|
||||
def api_recent_commands():
|
||||
"""API endpoint to get recent commands from database"""
|
||||
conn = None
|
||||
try:
|
||||
import sqlite3
|
||||
import json
|
||||
@@ -399,38 +392,32 @@ class BotDataViewer:
|
||||
# Resolve database path (relative paths resolved from bot root, absolute paths used as-is)
|
||||
db_path = resolve_path(db_path, self.bot_root)
|
||||
|
||||
conn = sqlite3.connect(db_path, timeout=30)
|
||||
cursor = conn.cursor()
|
||||
|
||||
cursor.execute('''
|
||||
SELECT data FROM packet_stream
|
||||
WHERE type = 'command' AND timestamp > ?
|
||||
ORDER BY timestamp DESC
|
||||
LIMIT 100
|
||||
''', (cutoff_time,))
|
||||
|
||||
rows = cursor.fetchall()
|
||||
|
||||
# Parse and return commands
|
||||
commands = []
|
||||
for (data_json,) in rows:
|
||||
try:
|
||||
command_data = json.loads(data_json)
|
||||
commands.append(command_data)
|
||||
except Exception as e:
|
||||
self.logger.debug(f"Error parsing command data: {e}")
|
||||
|
||||
return jsonify({'commands': commands})
|
||||
with sqlite3.connect(db_path, timeout=30) as conn:
|
||||
cursor = conn.cursor()
|
||||
|
||||
cursor.execute('''
|
||||
SELECT data FROM packet_stream
|
||||
WHERE type = 'command' AND timestamp > ?
|
||||
ORDER BY timestamp DESC
|
||||
LIMIT 100
|
||||
''', (cutoff_time,))
|
||||
|
||||
rows = cursor.fetchall()
|
||||
|
||||
# Parse and return commands
|
||||
commands = []
|
||||
for (data_json,) in rows:
|
||||
try:
|
||||
command_data = json.loads(data_json)
|
||||
commands.append(command_data)
|
||||
except Exception as e:
|
||||
self.logger.debug(f"Error parsing command data: {e}")
|
||||
|
||||
return jsonify({'commands': commands})
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error getting recent commands: {e}")
|
||||
return jsonify({'error': str(e)}), 500
|
||||
finally:
|
||||
if conn:
|
||||
try:
|
||||
conn.close()
|
||||
except Exception as e:
|
||||
self.logger.debug(f"Error closing recent_commands connection: {e}")
|
||||
|
||||
@self.app.route('/api/geocode-contact', methods=['POST'])
|
||||
def api_geocode_contact():
|
||||
@@ -1415,7 +1402,6 @@ class BotDataViewer:
|
||||
max_consecutive_errors = 10
|
||||
|
||||
while True:
|
||||
conn = None
|
||||
try:
|
||||
import time
|
||||
import sqlite3
|
||||
@@ -1428,37 +1414,37 @@ class BotDataViewer:
|
||||
db_path = resolve_path(db_path, self.bot_root)
|
||||
|
||||
# Connect to database with timeout to prevent hanging
|
||||
conn = sqlite3.connect(db_path, timeout=30)
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Get new data since last poll
|
||||
cursor.execute('''
|
||||
SELECT timestamp, data, type FROM packet_stream
|
||||
WHERE timestamp > ?
|
||||
ORDER BY timestamp ASC
|
||||
''', (last_timestamp,))
|
||||
|
||||
rows = cursor.fetchall()
|
||||
|
||||
# Process new data
|
||||
for timestamp, data_json, data_type in rows:
|
||||
try:
|
||||
data = json.loads(data_json)
|
||||
|
||||
# Broadcast based on type
|
||||
if data_type == 'command':
|
||||
self._handle_command_data(data)
|
||||
elif data_type == 'packet':
|
||||
self._handle_packet_data(data)
|
||||
elif data_type == 'routing':
|
||||
self._handle_packet_data(data) # Treat routing as packet data
|
||||
with sqlite3.connect(db_path, timeout=30) as conn:
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Get new data since last poll
|
||||
cursor.execute('''
|
||||
SELECT timestamp, data, type FROM packet_stream
|
||||
WHERE timestamp > ?
|
||||
ORDER BY timestamp ASC
|
||||
''', (last_timestamp,))
|
||||
|
||||
rows = cursor.fetchall()
|
||||
|
||||
# Process new data
|
||||
for timestamp, data_json, data_type in rows:
|
||||
try:
|
||||
data = json.loads(data_json)
|
||||
|
||||
except Exception as e:
|
||||
self.logger.warning(f"Error processing database data: {e}")
|
||||
|
||||
# Update last timestamp
|
||||
if rows:
|
||||
last_timestamp = rows[-1][0]
|
||||
# Broadcast based on type
|
||||
if data_type == 'command':
|
||||
self._handle_command_data(data)
|
||||
elif data_type == 'packet':
|
||||
self._handle_packet_data(data)
|
||||
elif data_type == 'routing':
|
||||
self._handle_packet_data(data) # Treat routing as packet data
|
||||
|
||||
except Exception as e:
|
||||
self.logger.warning(f"Error processing database data: {e}")
|
||||
|
||||
# Update last timestamp
|
||||
if rows:
|
||||
last_timestamp = rows[-1][0]
|
||||
|
||||
# Reset error counter on success
|
||||
consecutive_errors = 0
|
||||
@@ -1491,13 +1477,6 @@ class BotDataViewer:
|
||||
self.logger.warning(f"Database polling unexpected error (attempt {consecutive_errors}): {e}")
|
||||
time.sleep(2)
|
||||
|
||||
finally:
|
||||
# Always close connection, even on error
|
||||
if conn:
|
||||
try:
|
||||
conn.close()
|
||||
except Exception as e:
|
||||
self.logger.debug(f"Error closing database connection: {e}")
|
||||
|
||||
# Start polling thread
|
||||
polling_thread = threading.Thread(target=poll_database, daemon=True)
|
||||
@@ -1529,7 +1508,6 @@ class BotDataViewer:
|
||||
|
||||
def _cleanup_old_data(self, days_to_keep: int = 7):
|
||||
"""Clean up old packet stream data to prevent database bloat"""
|
||||
conn = None
|
||||
try:
|
||||
import sqlite3
|
||||
import time
|
||||
@@ -1543,26 +1521,20 @@ class BotDataViewer:
|
||||
db_path = resolve_path(db_path, self.bot_root)
|
||||
|
||||
# Use timeout to prevent hanging
|
||||
conn = sqlite3.connect(db_path, timeout=30)
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Clean up old packet stream data
|
||||
cursor.execute('DELETE FROM packet_stream WHERE timestamp < ?', (cutoff_time,))
|
||||
deleted_count = cursor.rowcount
|
||||
|
||||
conn.commit()
|
||||
|
||||
if deleted_count > 0:
|
||||
self.logger.info(f"Cleaned up {deleted_count} old packet stream entries (older than {days_to_keep} days)")
|
||||
with sqlite3.connect(db_path, timeout=30) as conn:
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Clean up old packet stream data
|
||||
cursor.execute('DELETE FROM packet_stream WHERE timestamp < ?', (cutoff_time,))
|
||||
deleted_count = cursor.rowcount
|
||||
|
||||
conn.commit()
|
||||
|
||||
if deleted_count > 0:
|
||||
self.logger.info(f"Cleaned up {deleted_count} old packet stream entries (older than {days_to_keep} days)")
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error cleaning up old packet stream data: {e}")
|
||||
finally:
|
||||
if conn:
|
||||
try:
|
||||
conn.close()
|
||||
except Exception as e:
|
||||
self.logger.debug(f"Error closing cleanup connection: {e}")
|
||||
|
||||
def _get_database_stats(self, top_users_window='all', top_commands_window='all',
|
||||
top_paths_window='all', top_channels_window='all'):
|
||||
|
||||
Reference in New Issue
Block a user