mirror of
https://github.com/agessaman/meshcore-bot.git
synced 2026-04-03 22:15:40 +00:00
- Added a regex pattern to validate SQLite column definitions, preventing SQL injection in the _add_column() function. - Introduced a new validation function to ensure safe column definitions are used. - Updated repeater_manager.py to use list() for iterating over contacts, ensuring compatibility with potential changes in the underlying data structure. - Enhanced error handling in the MessageScheduler for better debugging during scheduler shutdown and message sending. These changes improve the security and reliability of database operations and enhance the robustness of the repeater management system.
836 lines
35 KiB
Python
836 lines
35 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Web Viewer Integration for MeshCore Bot
|
|
Provides integration between the main bot and the web viewer
|
|
"""
|
|
|
|
import os
|
|
import queue
|
|
import re
|
|
import secrets
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
import time
|
|
from contextlib import closing, suppress
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from ..utils import resolve_path
|
|
|
|
|
|
class BotIntegration:
|
|
"""Simple bot integration for web viewer compatibility"""
|
|
|
|
# After this many consecutive connection failures, stop sending until cooldown expires
|
|
CIRCUIT_BREAKER_THRESHOLD = 3
|
|
CIRCUIT_BREAKER_COOLDOWN_SEC = 60
|
|
|
|
# How often (seconds) the drain thread flushes the write queue
|
|
DRAIN_INTERVAL = 0.5
|
|
|
|
def __init__(self, bot):
|
|
self.bot = bot
|
|
self.circuit_breaker_open = False
|
|
self.circuit_breaker_failures = 0
|
|
self.circuit_breaker_last_failure_time = 0.0
|
|
self.is_shutting_down = False
|
|
# Batched write queue: avoids a per-insert sqlite3.connect() round-trip
|
|
self._write_queue: queue.Queue = queue.Queue()
|
|
self._drain_stop = threading.Event()
|
|
self._drain_thread: Optional[threading.Thread] = None
|
|
# Initialize HTTP session with connection pooling for efficient reuse
|
|
self._init_http_session()
|
|
# Generate a shared secret for authenticating internal /api/stream_data calls.
|
|
# Stored in DB metadata so the web viewer can validate it.
|
|
self._stream_token = secrets.token_hex(32)
|
|
try:
|
|
self.bot.db_manager.set_metadata('internal.stream_token', self._stream_token)
|
|
except Exception as e:
|
|
self.bot.logger.debug(f"Could not persist stream token: {e}")
|
|
if getattr(self, 'http_session', None):
|
|
self.http_session.headers['X-Stream-Token'] = self._stream_token
|
|
# Start background drain thread after table is confirmed to exist
|
|
self._start_drain_thread()
|
|
|
|
def _init_http_session(self):
|
|
"""Initialize a requests.Session with connection pooling and keep-alive"""
|
|
try:
|
|
import logging
|
|
|
|
import requests
|
|
import urllib3
|
|
from requests.adapters import HTTPAdapter
|
|
from urllib3.util.retry import Retry
|
|
|
|
# Suppress urllib3 connection pool messages when web viewer is unreachable
|
|
# Connection refused / Retrying WARNINGs would flood logs during routing bursts
|
|
urllib3_logger = logging.getLogger('urllib3.connectionpool')
|
|
urllib3_logger.setLevel(logging.ERROR)
|
|
|
|
# Also disable other urllib3 warnings
|
|
urllib3.disable_warnings(urllib3.exceptions.NotOpenSSLWarning)
|
|
|
|
self.http_session = requests.Session()
|
|
|
|
# Configure retry strategy
|
|
retry_strategy = Retry(
|
|
total=2,
|
|
backoff_factor=0.1,
|
|
status_forcelist=[429, 500, 502, 503, 504],
|
|
)
|
|
|
|
# Mount adapter with connection pooling
|
|
# pool_block=False allows non-blocking behavior if pool is full
|
|
adapter = HTTPAdapter(
|
|
pool_connections=1, # Single connection pool for web viewer
|
|
pool_maxsize=5, # Allow up to 5 connections in the pool
|
|
max_retries=retry_strategy,
|
|
pool_block=False # Don't block if pool is full
|
|
)
|
|
self.http_session.mount("http://", adapter)
|
|
self.http_session.mount("https://", adapter)
|
|
|
|
# Set default headers for keep-alive and internal auth
|
|
self.http_session.headers.update({
|
|
'Connection': 'keep-alive',
|
|
'X-Requested-With': 'BotIntegration', # CSRF bypass for internal calls
|
|
})
|
|
except ImportError:
|
|
# Fallback if requests is not available
|
|
self.http_session = None
|
|
except Exception as e:
|
|
self.bot.logger.debug(f"Error initializing HTTP session: {e}")
|
|
self.http_session = None
|
|
|
|
def reset_circuit_breaker(self):
|
|
"""Reset the circuit breaker"""
|
|
self.circuit_breaker_open = False
|
|
self.circuit_breaker_failures = 0
|
|
|
|
def _should_skip_web_viewer_send(self):
|
|
"""Return True if we should skip sending (circuit open and within cooldown)."""
|
|
if not self.circuit_breaker_open:
|
|
return False
|
|
if (time.time() - self.circuit_breaker_last_failure_time) >= self.CIRCUIT_BREAKER_COOLDOWN_SEC:
|
|
self.reset_circuit_breaker()
|
|
return False
|
|
return True
|
|
|
|
def _record_web_viewer_result(self, success):
|
|
"""Update circuit breaker state after a send attempt."""
|
|
if success:
|
|
self.reset_circuit_breaker()
|
|
else:
|
|
self.circuit_breaker_failures += 1
|
|
self.circuit_breaker_last_failure_time = time.time()
|
|
if self.circuit_breaker_failures >= self.CIRCUIT_BREAKER_THRESHOLD:
|
|
self.circuit_breaker_open = True
|
|
self.bot.logger.debug(
|
|
"Web viewer unreachable after %d failures; circuit open for %ds",
|
|
self.circuit_breaker_failures,
|
|
self.CIRCUIT_BREAKER_COOLDOWN_SEC,
|
|
)
|
|
|
|
def _get_web_viewer_db_path(self):
|
|
"""Return resolved database path for web viewer. Uses [Bot] db_path when [Web_Viewer] db_path is unset."""
|
|
base_dir = self.bot.bot_root if hasattr(self.bot, 'bot_root') else '.'
|
|
if self.bot.config.has_section('Web_Viewer') and self.bot.config.has_option('Web_Viewer', 'db_path'):
|
|
raw = self.bot.config.get('Web_Viewer', 'db_path', fallback='').strip()
|
|
if raw:
|
|
return resolve_path(raw, base_dir)
|
|
return str(Path(self.bot.db_manager.db_path).resolve())
|
|
|
|
def _init_packet_stream_table(self):
|
|
"""Backward-compatible initializer (now handled by migrations).
|
|
|
|
Kept for older call sites and tests that patch this method. Safe to call
|
|
multiple times and safe to ignore failures.
|
|
"""
|
|
try:
|
|
import sqlite3
|
|
|
|
db_path = self._get_web_viewer_db_path()
|
|
with closing(sqlite3.connect(str(db_path), timeout=60.0)) as conn:
|
|
cur = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='packet_stream'"
|
|
)
|
|
if cur.fetchone() is not None:
|
|
return
|
|
try:
|
|
foreign_keys = self.bot.config.getboolean("Web_Viewer", "sqlite_foreign_keys", fallback=True)
|
|
busy_timeout_ms = self.bot.config.getint("Web_Viewer", "sqlite_busy_timeout_ms", fallback=60000)
|
|
journal_mode = self.bot.config.get("Web_Viewer", "sqlite_journal_mode", fallback="WAL").strip() or "WAL"
|
|
except Exception:
|
|
foreign_keys = True
|
|
busy_timeout_ms = 60000
|
|
journal_mode = "WAL"
|
|
conn.execute(f"PRAGMA foreign_keys={'ON' if foreign_keys else 'OFF'}")
|
|
conn.execute(f"PRAGMA busy_timeout={int(busy_timeout_ms)}")
|
|
try:
|
|
conn.execute(f"PRAGMA journal_mode={journal_mode}")
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS packet_stream (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp REAL NOT NULL,
|
|
data TEXT NOT NULL,
|
|
type TEXT NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_packet_stream_timestamp ON packet_stream(timestamp)"
|
|
)
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_packet_stream_type ON packet_stream(type)"
|
|
)
|
|
conn.commit()
|
|
except Exception:
|
|
pass
|
|
|
|
def _start_drain_thread(self) -> None:
|
|
"""Start the background thread that flushes the write queue every DRAIN_INTERVAL seconds."""
|
|
self._drain_thread = threading.Thread(
|
|
target=self._drain_loop,
|
|
name="packet-stream-drain",
|
|
daemon=True,
|
|
)
|
|
self._drain_thread.start()
|
|
|
|
def _drain_loop(self) -> None:
|
|
"""Background loop: flush the write queue every DRAIN_INTERVAL seconds."""
|
|
while not self._drain_stop.is_set():
|
|
self._drain_stop.wait(timeout=self.DRAIN_INTERVAL)
|
|
self._flush_write_queue()
|
|
|
|
def _flush_write_queue(self) -> None:
|
|
"""Drain all queued rows and insert them in a single batched transaction."""
|
|
import sqlite3
|
|
if self._write_queue.empty():
|
|
return
|
|
rows = []
|
|
while not self._write_queue.empty():
|
|
try:
|
|
rows.append(self._write_queue.get_nowait())
|
|
except queue.Empty:
|
|
break
|
|
if not rows:
|
|
return
|
|
db_path = self._get_web_viewer_db_path()
|
|
max_retries = 3
|
|
for attempt in range(max_retries):
|
|
try:
|
|
with closing(sqlite3.connect(str(db_path), timeout=60.0)) as conn:
|
|
conn.executemany(
|
|
'INSERT INTO packet_stream (timestamp, data, type) VALUES (?, ?, ?)',
|
|
rows,
|
|
)
|
|
conn.commit()
|
|
return
|
|
except sqlite3.OperationalError as e:
|
|
if "locked" in str(e).lower() and attempt < max_retries - 1:
|
|
time.sleep(0.15 * (attempt + 1))
|
|
continue
|
|
self.bot.logger.warning(f"Error flushing packet_stream queue ({len(rows)} rows): {e}")
|
|
return
|
|
except Exception as e:
|
|
self.bot.logger.warning(f"Error flushing packet_stream queue ({len(rows)} rows): {e}")
|
|
return
|
|
|
|
def _insert_packet_stream_row(self, data_json: str, row_type: str, log_prefix: str = "packet data"):
|
|
"""Queue one row for batched insertion into packet_stream by the drain thread."""
|
|
try:
|
|
self._write_queue.put_nowait((time.time(), data_json, row_type))
|
|
except Exception as e:
|
|
self.bot.logger.warning(f"Error queuing {log_prefix} for web viewer: {e}")
|
|
|
|
def capture_full_packet_data(self, packet_data):
|
|
"""Capture full packet data and store in database for web viewer"""
|
|
try:
|
|
import json
|
|
from datetime import datetime
|
|
|
|
# Ensure packet_data is a dict (might be passed as dict already)
|
|
if not isinstance(packet_data, dict):
|
|
packet_data = self._make_json_serializable(packet_data)
|
|
if not isinstance(packet_data, dict):
|
|
# If still not a dict, wrap it
|
|
packet_data = {'data': packet_data}
|
|
|
|
# Add hops field from path_len if not already present
|
|
# path_len represents the number of hops (each byte = 1 hop)
|
|
if 'hops' not in packet_data and 'path_len' in packet_data:
|
|
packet_data['hops'] = packet_data.get('path_len', 0)
|
|
elif 'hops' not in packet_data:
|
|
# If no path_len either, default to 0 hops
|
|
packet_data['hops'] = 0
|
|
|
|
# Add datetime for frontend display
|
|
if 'datetime' not in packet_data:
|
|
packet_data['datetime'] = datetime.now().isoformat()
|
|
|
|
# Convert non-serializable objects to strings
|
|
serializable_data = self._make_json_serializable(packet_data)
|
|
|
|
# Store in database for web viewer to read (retries on database is locked)
|
|
self._insert_packet_stream_row(json.dumps(serializable_data), 'packet', "packet data")
|
|
|
|
except Exception as e:
|
|
self.bot.logger.warning(f"Error storing packet data for web viewer: {e}")
|
|
|
|
def capture_command(self, message, command_name, response, success, command_id=None):
|
|
"""Capture command data and store in database for web viewer"""
|
|
try:
|
|
import json
|
|
import time
|
|
|
|
# Extract data from message object
|
|
user = getattr(message, 'sender_id', 'Unknown')
|
|
channel = getattr(message, 'channel', 'Unknown')
|
|
user_input = getattr(message, 'content', f'/{command_name}')
|
|
|
|
# Get repeat information if transmission tracker is available
|
|
repeat_count = 0
|
|
repeater_prefixes = []
|
|
repeater_counts = {}
|
|
if (hasattr(self.bot, 'transmission_tracker') and
|
|
self.bot.transmission_tracker and
|
|
command_id):
|
|
repeat_info = self.bot.transmission_tracker.get_repeat_info(command_id=command_id)
|
|
repeat_count = repeat_info.get('repeat_count', 0)
|
|
repeater_prefixes = repeat_info.get('repeater_prefixes', [])
|
|
repeater_counts = repeat_info.get('repeater_counts', {})
|
|
|
|
# Construct command data structure
|
|
command_data = {
|
|
'user': user,
|
|
'channel': channel,
|
|
'command': command_name,
|
|
'user_input': user_input,
|
|
'response': response,
|
|
'success': success,
|
|
'timestamp': time.time(),
|
|
'repeat_count': repeat_count,
|
|
'repeater_prefixes': repeater_prefixes,
|
|
'repeater_counts': repeater_counts, # Count per repeater prefix
|
|
'command_id': command_id # Store command_id for later updates
|
|
}
|
|
|
|
# Convert non-serializable objects to strings
|
|
serializable_data = self._make_json_serializable(command_data)
|
|
|
|
# Store in database for web viewer to read (retries on database is locked)
|
|
self._insert_packet_stream_row(json.dumps(serializable_data), 'command', "command data")
|
|
|
|
except Exception as e:
|
|
self.bot.logger.debug(f"Error storing command data: {e}")
|
|
|
|
def capture_channel_message(self, message) -> None:
|
|
"""Capture an incoming channel or DM message for the web viewer live monitor."""
|
|
try:
|
|
import json
|
|
import time
|
|
data = {
|
|
'type': 'message',
|
|
'timestamp': time.time(),
|
|
'sender': getattr(message, 'sender_id', ''),
|
|
'channel': getattr(message, 'channel', ''),
|
|
'content': getattr(message, 'content', ''),
|
|
'snr': str(getattr(message, 'snr', '')),
|
|
'hops': getattr(message, 'hops', None),
|
|
'path': getattr(message, 'path', ''),
|
|
'is_dm': bool(getattr(message, 'is_dm', False)),
|
|
}
|
|
self._insert_packet_stream_row(json.dumps(data), 'message', "channel message")
|
|
except Exception as e:
|
|
self.bot.logger.debug(f"Error storing channel message for web viewer: {e}")
|
|
|
|
def capture_packet_routing(self, routing_data):
|
|
"""Capture packet routing data and store in database for web viewer"""
|
|
try:
|
|
import json
|
|
|
|
# Convert non-serializable objects to strings
|
|
serializable_data = self._make_json_serializable(routing_data)
|
|
|
|
# Store in database for web viewer to read (retries on database is locked)
|
|
self._insert_packet_stream_row(json.dumps(serializable_data), 'routing', "routing data")
|
|
|
|
except Exception as e:
|
|
self.bot.logger.debug(f"Error storing routing data: {e}")
|
|
|
|
def cleanup_old_data(self, days_to_keep: Optional[int] = None):
|
|
"""Clean up old packet stream data to prevent database bloat.
|
|
Uses [Data_Retention] packet_stream_retention_days when days_to_keep is not provided."""
|
|
try:
|
|
import sqlite3
|
|
import time
|
|
|
|
if days_to_keep is None:
|
|
days_to_keep = 3
|
|
if self.bot.config.has_section('Data_Retention') and self.bot.config.has_option('Data_Retention', 'packet_stream_retention_days'):
|
|
with suppress(ValueError, TypeError):
|
|
days_to_keep = self.bot.config.getint('Data_Retention', 'packet_stream_retention_days')
|
|
|
|
cutoff_time = time.time() - (days_to_keep * 24 * 60 * 60)
|
|
|
|
db_path = self._get_web_viewer_db_path()
|
|
with closing(sqlite3.connect(str(db_path), timeout=60.0)) as conn:
|
|
cursor = conn.cursor()
|
|
|
|
# Clean up old packet stream data
|
|
cursor.execute('DELETE FROM packet_stream WHERE timestamp < ?', (cutoff_time,))
|
|
deleted_count = cursor.rowcount
|
|
|
|
conn.commit()
|
|
|
|
if deleted_count > 0:
|
|
self.bot.logger.info(f"Cleaned up {deleted_count} old packet stream entries (older than {days_to_keep} days)")
|
|
|
|
except Exception as e:
|
|
self.bot.logger.error(f"Error cleaning up old packet stream data: {e}")
|
|
|
|
def _make_json_serializable(self, obj, depth=0, max_depth=3):
|
|
"""Convert non-JSON-serializable objects to strings with depth limiting"""
|
|
if depth > max_depth:
|
|
return str(obj)
|
|
|
|
# Handle basic types first
|
|
if obj is None or isinstance(obj, (str, int, float, bool)):
|
|
return obj
|
|
elif isinstance(obj, (list, tuple)):
|
|
return [self._make_json_serializable(item, depth + 1) for item in obj]
|
|
elif isinstance(obj, dict):
|
|
return {k: self._make_json_serializable(v, depth + 1) for k, v in obj.items()}
|
|
elif hasattr(obj, 'name'): # Enum-like objects
|
|
return obj.name
|
|
elif hasattr(obj, 'value'): # Enum values
|
|
return obj.value
|
|
elif hasattr(obj, '__dict__'):
|
|
# Convert objects to dict, but limit depth
|
|
try:
|
|
return {k: self._make_json_serializable(v, depth + 1) for k, v in obj.__dict__.items()}
|
|
except (RecursionError, RuntimeError):
|
|
return str(obj)
|
|
else:
|
|
return str(obj)
|
|
|
|
def send_mesh_edge_update(self, edge_data):
|
|
"""Send mesh edge update to web viewer via HTTP API"""
|
|
try:
|
|
if self._should_skip_web_viewer_send():
|
|
return
|
|
# Get web viewer URL from config
|
|
host = self.bot.config.get('Web_Viewer', 'host', fallback='127.0.0.1')
|
|
port = self.bot.config.getint('Web_Viewer', 'port', fallback=8080)
|
|
url = f"http://{host}:{port}/api/stream_data"
|
|
|
|
payload = {
|
|
'type': 'mesh_edge',
|
|
'data': edge_data
|
|
}
|
|
|
|
# Use session with connection pooling if available, otherwise fallback to requests.post
|
|
headers = {
|
|
'X-Stream-Token': self._stream_token,
|
|
'X-Requested-With': 'BotIntegration',
|
|
}
|
|
if self.http_session:
|
|
try:
|
|
self.http_session.post(url, json=payload, timeout=1.0)
|
|
self._record_web_viewer_result(True)
|
|
except Exception:
|
|
self._record_web_viewer_result(False)
|
|
else:
|
|
import requests
|
|
try:
|
|
requests.post(url, json=payload, timeout=1.0, headers=headers)
|
|
self._record_web_viewer_result(True)
|
|
except Exception:
|
|
self._record_web_viewer_result(False)
|
|
except Exception as e:
|
|
self.bot.logger.debug(f"Error sending mesh edge update to web viewer: {e}")
|
|
|
|
def send_mesh_node_update(self, node_data):
|
|
"""Send mesh node update to web viewer via HTTP API"""
|
|
try:
|
|
if self._should_skip_web_viewer_send():
|
|
return
|
|
import requests
|
|
|
|
host = self.bot.config.get('Web_Viewer', 'host', fallback='127.0.0.1')
|
|
port = self.bot.config.getint('Web_Viewer', 'port', fallback=8080)
|
|
url = f"http://{host}:{port}/api/stream_data"
|
|
|
|
payload = {
|
|
'type': 'mesh_node',
|
|
'data': node_data
|
|
}
|
|
|
|
headers = {
|
|
'X-Stream-Token': self._stream_token,
|
|
'X-Requested-With': 'BotIntegration',
|
|
}
|
|
try:
|
|
requests.post(url, json=payload, timeout=0.5, headers=headers)
|
|
self._record_web_viewer_result(True)
|
|
except Exception:
|
|
self._record_web_viewer_result(False)
|
|
except Exception as e:
|
|
self.bot.logger.debug(f"Error sending mesh node update to web viewer: {e}")
|
|
|
|
def shutdown(self):
|
|
"""Mark as shutting down, stop drain thread, flush remaining rows, and close HTTP session."""
|
|
self.is_shutting_down = True
|
|
# Stop drain thread and do a final flush of any queued rows
|
|
self._drain_stop.set()
|
|
if self._drain_thread and self._drain_thread.is_alive():
|
|
self._drain_thread.join(timeout=5.0)
|
|
self._flush_write_queue()
|
|
# Close HTTP session to clean up connections
|
|
if hasattr(self, 'http_session') and self.http_session:
|
|
with suppress(Exception):
|
|
self.http_session.close()
|
|
|
|
class WebViewerIntegration:
|
|
"""Integration class for starting/stopping the web viewer with the bot"""
|
|
|
|
# Whitelist of allowed host bindings for security
|
|
ALLOWED_HOSTS = ['127.0.0.1', 'localhost', '0.0.0.0']
|
|
|
|
def __init__(self, bot):
|
|
self.bot = bot
|
|
self.logger = bot.logger
|
|
self.viewer_process = None
|
|
self.viewer_thread = None
|
|
self.running = False
|
|
|
|
# File handles for subprocess stdout/stderr (for proper cleanup)
|
|
self._viewer_stdout_file = None
|
|
self._viewer_stderr_file = None
|
|
|
|
# Get web viewer settings from config
|
|
self.enabled = bot.config.getboolean('Web_Viewer', 'enabled', fallback=False)
|
|
self.host = bot.config.get('Web_Viewer', 'host', fallback='127.0.0.1')
|
|
self.port = bot.config.getint('Web_Viewer', 'port', fallback=8080) # Web viewer uses 8080
|
|
self.debug = bot.config.getboolean('Web_Viewer', 'debug', fallback=False)
|
|
self.auto_start = bot.config.getboolean('Web_Viewer', 'auto_start', fallback=False)
|
|
|
|
# Validate configuration for security
|
|
self._validate_config()
|
|
|
|
# Process monitoring
|
|
self.restart_count = 0
|
|
self.max_restarts = 5
|
|
self.last_restart = 0
|
|
|
|
# Initialize bot integration for compatibility
|
|
self.bot_integration = BotIntegration(bot)
|
|
|
|
if self.enabled and self.auto_start:
|
|
self.start_viewer()
|
|
|
|
def _validate_config(self):
|
|
"""Validate web viewer configuration for security"""
|
|
# Validate host against whitelist
|
|
if self.host not in self.ALLOWED_HOSTS:
|
|
raise ValueError(
|
|
f"Invalid host configuration: {self.host}. "
|
|
f"Allowed hosts: {', '.join(self.ALLOWED_HOSTS)}"
|
|
)
|
|
|
|
# Validate port range (avoid privileged ports)
|
|
if not isinstance(self.port, int) or not (1024 <= self.port <= 65535):
|
|
raise ValueError(
|
|
f"Port must be between 1024-65535 (non-privileged), got: {self.port}"
|
|
)
|
|
|
|
# Enforce authentication when binding to non-loopback interfaces
|
|
if self.host == '0.0.0.0':
|
|
password = self.bot.config.get('Web_Viewer', 'web_viewer_password', fallback='').strip()
|
|
if not password:
|
|
raise ValueError(
|
|
"web_viewer_password must be set in [Web_Viewer] config when "
|
|
"binding to all interfaces (host = 0.0.0.0). "
|
|
"For unauthenticated local-only access, use host = 127.0.0.1."
|
|
)
|
|
|
|
def start_viewer(self):
|
|
"""Start the web viewer in a separate thread"""
|
|
if self.running:
|
|
self.logger.warning("Web viewer is already running")
|
|
return
|
|
|
|
try:
|
|
# Start the web viewer
|
|
self.viewer_thread = threading.Thread(target=self._run_viewer, daemon=True)
|
|
self.viewer_thread.start()
|
|
self.running = True
|
|
self.logger.info(f"Web viewer started on http://{self.host}:{self.port}")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to start web viewer: {e}")
|
|
|
|
def stop_viewer(self):
|
|
"""Stop the web viewer"""
|
|
if not self.running and not self.viewer_process:
|
|
return
|
|
|
|
try:
|
|
self.running = False
|
|
|
|
if self.viewer_process and self.viewer_process.poll() is None:
|
|
self.logger.info("Stopping web viewer...")
|
|
try:
|
|
# First try graceful termination
|
|
self.viewer_process.terminate()
|
|
self.viewer_process.wait(timeout=5)
|
|
self.logger.info("Web viewer stopped gracefully")
|
|
except subprocess.TimeoutExpired:
|
|
self.logger.warning("Web viewer did not stop gracefully, forcing termination")
|
|
try:
|
|
self.viewer_process.kill()
|
|
self.viewer_process.wait(timeout=2)
|
|
except subprocess.TimeoutExpired:
|
|
self.logger.error("Failed to kill web viewer process")
|
|
except Exception as e:
|
|
self.logger.warning(f"Error during forced termination: {e}")
|
|
except Exception as e:
|
|
self.logger.warning(f"Error during web viewer shutdown: {e}")
|
|
finally:
|
|
self.viewer_process = None
|
|
|
|
# Close log file handles
|
|
if self._viewer_stdout_file:
|
|
try:
|
|
self._viewer_stdout_file.close()
|
|
except Exception as e:
|
|
self.logger.debug(f"Error closing stdout file: {e}")
|
|
finally:
|
|
self._viewer_stdout_file = None
|
|
|
|
if self._viewer_stderr_file:
|
|
try:
|
|
self._viewer_stderr_file.close()
|
|
except Exception as e:
|
|
self.logger.debug(f"Error closing stderr file: {e}")
|
|
finally:
|
|
self._viewer_stderr_file = None
|
|
|
|
if not self.viewer_process:
|
|
self.logger.info("Web viewer already stopped")
|
|
|
|
# Additional cleanup: kill any remaining processes on the port
|
|
try:
|
|
result = subprocess.run(['lsof', '-ti', f':{self.port}'],
|
|
capture_output=True, text=True, timeout=5)
|
|
if result.returncode == 0 and result.stdout.strip():
|
|
pids = result.stdout.strip().split('\n')
|
|
for pid in pids:
|
|
pid = pid.strip()
|
|
if not pid:
|
|
continue
|
|
|
|
# Validate PID is numeric only (prevent injection)
|
|
if not re.match(r'^\d+$', pid):
|
|
self.logger.warning(f"Invalid PID format: {pid}, skipping")
|
|
continue
|
|
|
|
try:
|
|
pid_int = int(pid)
|
|
# Safety check: never kill system PIDs
|
|
if pid_int < 2:
|
|
self.logger.warning(f"Refusing to kill system PID: {pid}")
|
|
continue
|
|
|
|
subprocess.run(['kill', '-9', str(pid_int)], timeout=2)
|
|
self.logger.info(f"Killed remaining process {pid} on port {self.port}")
|
|
except (ValueError, subprocess.TimeoutExpired) as e:
|
|
self.logger.warning(f"Failed to kill process {pid}: {e}")
|
|
except Exception as e:
|
|
self.logger.debug(f"Port cleanup check failed: {e}")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error stopping web viewer: {e}")
|
|
|
|
def _run_viewer(self):
|
|
"""Run the web viewer in a separate process"""
|
|
stdout_file = None
|
|
stderr_file = None
|
|
|
|
try:
|
|
# Get the path to the web viewer script
|
|
viewer_script = Path(__file__).parent / "app.py"
|
|
# Use same config as bot so viewer finds db_path, Greeter_Command, etc.
|
|
config_path = getattr(self.bot, 'config_file', 'config.ini')
|
|
config_path = str(Path(config_path).resolve()) if config_path else 'config.ini'
|
|
|
|
# Build command
|
|
cmd = [
|
|
sys.executable,
|
|
str(viewer_script),
|
|
"--config", config_path,
|
|
"--host", self.host,
|
|
"--port", str(self.port)
|
|
]
|
|
|
|
if self.debug:
|
|
cmd.append("--debug")
|
|
|
|
# Ensure logs directory exists
|
|
os.makedirs('logs', exist_ok=True)
|
|
|
|
# Open log files in write mode to prevent buffer blocking
|
|
# This fixes the issue where subprocess.PIPE buffers (~64KB) fill up
|
|
# after ~5 minutes and cause the subprocess to hang.
|
|
# Using 'w' mode (overwrite) instead of 'a' (append) since:
|
|
# - The web viewer already has proper logging to web_viewer_modern.log
|
|
# - stdout/stderr are mainly for immediate debugging
|
|
# - Prevents unbounded log file growth
|
|
stdout_file = open('logs/web_viewer_stdout.log', 'w')
|
|
stderr_file = open('logs/web_viewer_stderr.log', 'w')
|
|
|
|
# Store file handles for proper cleanup
|
|
self._viewer_stdout_file = stdout_file
|
|
self._viewer_stderr_file = stderr_file
|
|
|
|
# Start the viewer process with log file redirection
|
|
self.viewer_process = subprocess.Popen(
|
|
cmd,
|
|
stdout=stdout_file,
|
|
stderr=stderr_file,
|
|
text=True
|
|
)
|
|
|
|
# Give it a moment to start up
|
|
time.sleep(2)
|
|
|
|
# Check if it started successfully
|
|
if self.viewer_process and self.viewer_process.poll() is not None:
|
|
# Process failed immediately - read from log files for error reporting
|
|
stdout_file.flush()
|
|
stderr_file.flush()
|
|
|
|
# Read last few lines from stderr for error reporting
|
|
try:
|
|
stderr_file.close()
|
|
with open('logs/web_viewer_stderr.log') as f:
|
|
stderr_lines = f.readlines()[-20:] # Last 20 lines
|
|
stderr = ''.join(stderr_lines)
|
|
except Exception:
|
|
stderr = "Could not read stderr log"
|
|
|
|
# Read last few lines from stdout for error reporting
|
|
try:
|
|
stdout_file.close()
|
|
with open('logs/web_viewer_stdout.log') as f:
|
|
stdout_lines = f.readlines()[-20:] # Last 20 lines
|
|
stdout = ''.join(stdout_lines)
|
|
except Exception:
|
|
stdout = "Could not read stdout log"
|
|
|
|
self.logger.error(f"Web viewer failed to start. Return code: {self.viewer_process.returncode}")
|
|
if stderr and stderr.strip():
|
|
self.logger.error(f"Web viewer startup error: {stderr}")
|
|
if stdout and stdout.strip():
|
|
self.logger.error(f"Web viewer startup output: {stdout}")
|
|
|
|
self.viewer_process = None
|
|
self._viewer_stdout_file = None
|
|
self._viewer_stderr_file = None
|
|
return
|
|
|
|
# Web viewer is ready
|
|
self.logger.info("Web viewer integration ready for data streaming")
|
|
|
|
# Monitor the process
|
|
while self.running and self.viewer_process and self.viewer_process.poll() is None:
|
|
time.sleep(1)
|
|
|
|
# Process exited - read from log files for error reporting if needed
|
|
if self.viewer_process and self.viewer_process.returncode != 0:
|
|
stdout_file.flush()
|
|
stderr_file.flush()
|
|
|
|
# Read last few lines from stderr for error reporting
|
|
try:
|
|
stderr_file.close()
|
|
with open('logs/web_viewer_stderr.log') as f:
|
|
stderr_lines = f.readlines()[-20:] # Last 20 lines
|
|
stderr = ''.join(stderr_lines)
|
|
except Exception:
|
|
stderr = "Could not read stderr log"
|
|
|
|
# Close stdout file as well
|
|
with suppress(Exception):
|
|
stdout_file.close()
|
|
|
|
self.logger.error(f"Web viewer process exited with code {self.viewer_process.returncode}")
|
|
if stderr and stderr.strip():
|
|
self.logger.error(f"Web viewer stderr: {stderr}")
|
|
|
|
self._viewer_stdout_file = None
|
|
self._viewer_stderr_file = None
|
|
elif self.viewer_process and self.viewer_process.returncode == 0:
|
|
self.logger.info("Web viewer process exited normally")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error running web viewer: {e}")
|
|
# Close file handles on error
|
|
if stdout_file:
|
|
with suppress(Exception):
|
|
stdout_file.close()
|
|
if stderr_file:
|
|
with suppress(Exception):
|
|
stderr_file.close()
|
|
self._viewer_stdout_file = None
|
|
self._viewer_stderr_file = None
|
|
finally:
|
|
self.running = False
|
|
|
|
def get_status(self):
|
|
"""Get the current status of the web viewer"""
|
|
return {
|
|
'enabled': self.enabled,
|
|
'running': self.running,
|
|
'host': self.host,
|
|
'port': self.port,
|
|
'debug': self.debug,
|
|
'auto_start': self.auto_start,
|
|
'url': f"http://{self.host}:{self.port}" if self.running else None
|
|
}
|
|
|
|
def restart_viewer(self):
|
|
"""Restart the web viewer with rate limiting"""
|
|
current_time = time.time()
|
|
|
|
# Rate limit restarts to prevent restart loops
|
|
if current_time - self.last_restart < 30: # 30 seconds between restarts
|
|
self.logger.warning("Restart rate limited - too soon since last restart")
|
|
return
|
|
|
|
if self.restart_count >= self.max_restarts:
|
|
self.logger.error(f"Maximum restart limit reached ({self.max_restarts}). Web viewer disabled.")
|
|
self.enabled = False
|
|
return
|
|
|
|
self.restart_count += 1
|
|
self.last_restart = current_time
|
|
|
|
self.logger.info(f"Restarting web viewer (attempt {self.restart_count}/{self.max_restarts})...")
|
|
self.stop_viewer()
|
|
time.sleep(3) # Give it more time to stop
|
|
|
|
self.start_viewer()
|
|
|
|
def is_viewer_healthy(self):
|
|
"""Check if the web viewer process is healthy"""
|
|
if not self.viewer_process:
|
|
return False
|
|
|
|
# Check if process is still running
|
|
return self.viewer_process.poll() is None
|