mirror of
https://github.com/agessaman/meshcore-bot.git
synced 2026-06-04 06:41:21 +00:00
1e6da4a08c
Updated the web viewer integration to utilize multiprocessing for running the viewer process, enhancing process management. Adjusted the logging configuration for better clarity and consistency. Removed deprecated timeout configurations and streamlined the shutdown process for the web viewer, ensuring graceful termination and improved error handling.
831 lines
35 KiB
Python
831 lines
35 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Web Viewer Integration for MeshCore Bot
|
|
Provides integration between the main bot and the web viewer
|
|
"""
|
|
|
|
import multiprocessing
|
|
import queue
|
|
import secrets
|
|
import threading
|
|
import time
|
|
from contextlib import closing, suppress
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from modules.utils import resolve_path
|
|
|
|
|
|
def _run_viewer_process(config: str, host: str, port: int, debug: bool) -> None:
|
|
"""Target for multiprocessing.Process — must live at module level to be picklable."""
|
|
from web_viewer.app import BotDataViewer
|
|
viewer = BotDataViewer(config_path=config)
|
|
viewer.run(host=host, port=port, debug=debug)
|
|
|
|
|
|
def normalized_web_viewer_password(config) -> str:
|
|
"""Return the effective web viewer password, or '' to disable the login screen.
|
|
|
|
Blank values, quoted empties (e.g. INI ``""``), and placeholders ``none`` / ``null`` /
|
|
``nil`` (case-insensitive) are treated as no password.
|
|
"""
|
|
if not config.has_section("Web_Viewer"):
|
|
return ""
|
|
raw = config.get("Web_Viewer", "web_viewer_password", fallback="")
|
|
if raw is None:
|
|
return ""
|
|
s = str(raw).strip()
|
|
if not s:
|
|
return ""
|
|
if len(s) >= 2 and s[0] == s[-1] and s[0] in ('"', "'"):
|
|
s = s[1:-1].strip()
|
|
if not s:
|
|
return ""
|
|
if s.lower() in ("none", "null", "nil"):
|
|
return ""
|
|
return s
|
|
|
|
|
|
class BotIntegration:
|
|
"""Simple bot integration for web viewer compatibility"""
|
|
|
|
# After this many consecutive connection failures, stop sending until cooldown expires
|
|
CIRCUIT_BREAKER_THRESHOLD = 3
|
|
CIRCUIT_BREAKER_COOLDOWN_SEC = 60
|
|
|
|
# How often (seconds) the drain thread flushes the write queue
|
|
DRAIN_INTERVAL = 0.5
|
|
|
|
# Bounded packet_stream write queue: producers block up to this long before drop/retry.
|
|
_WRITE_QUEUE_PUT_TIMEOUT_SEC = 5.0
|
|
EDGE_POST_TIMEOUT_SEC = 1.0
|
|
NODE_POST_TIMEOUT_SEC = 0.5
|
|
SQLITE_CONNECT_TIMEOUT_SEC = 60.0
|
|
REQUEUE_PUT_TIMEOUT_SEC = 60.0
|
|
SHUTDOWN_JOIN_TIMEOUT_SEC = 5.0
|
|
|
|
def __init__(self, bot):
|
|
self.bot = bot
|
|
self.circuit_breaker_open = False
|
|
self.circuit_breaker_failures = 0
|
|
self.circuit_breaker_last_failure_time = 0.0
|
|
self.is_shutting_down = False
|
|
# Serialize flushes so drain thread, shutdown, and producer retry cannot interleave.
|
|
self._flush_lock = threading.Lock()
|
|
maxsize = 1000
|
|
if self.bot.config.has_section("Web_Viewer"):
|
|
try:
|
|
maxsize = self.bot.config.getint(
|
|
"Web_Viewer", "packet_stream_write_queue_max", fallback=1000
|
|
)
|
|
except (ValueError, TypeError, OSError):
|
|
maxsize = 1000
|
|
self._write_queue_maxsize = max(1, int(maxsize))
|
|
# Batched write queue: avoids a per-insert sqlite3.connect() round-trip (bounded for RAM).
|
|
self._write_queue: queue.Queue = queue.Queue(maxsize=self._write_queue_maxsize)
|
|
self._drain_stop = threading.Event()
|
|
self._drain_thread: Optional[threading.Thread] = None
|
|
self._load_timeouts_from_config()
|
|
# Initialize HTTP session with connection pooling for efficient reuse
|
|
self._init_http_session()
|
|
# Generate a shared secret for authenticating internal /api/stream_data calls.
|
|
# Stored in DB metadata so the web viewer can validate it.
|
|
self._stream_token = secrets.token_hex(32)
|
|
try:
|
|
self.bot.db_manager.set_metadata('internal.stream_token', self._stream_token)
|
|
except Exception as e:
|
|
self.bot.logger.debug(f"Could not persist stream token: {e}")
|
|
if getattr(self, 'http_session', None):
|
|
self.http_session.headers['X-Stream-Token'] = self._stream_token
|
|
# Start background drain thread after table is confirmed to exist
|
|
self._start_drain_thread()
|
|
|
|
def _get_float_config(self, key: str, fallback: float) -> float:
|
|
"""Read float from [Web_Viewer] config with sane fallback."""
|
|
try:
|
|
value = self.bot.config.getfloat("Web_Viewer", key, fallback=fallback)
|
|
return value if value > 0 else fallback
|
|
except (ValueError, TypeError, OSError):
|
|
return fallback
|
|
|
|
def _load_timeouts_from_config(self) -> None:
|
|
"""Load optional BotIntegration timeout settings from config."""
|
|
self.edge_post_timeout_sec = self._get_float_config(
|
|
"edge_post_timeout_sec", self.EDGE_POST_TIMEOUT_SEC
|
|
)
|
|
self.node_post_timeout_sec = self._get_float_config(
|
|
"node_post_timeout_sec", self.NODE_POST_TIMEOUT_SEC
|
|
)
|
|
self.sqlite_connect_timeout_sec = self._get_float_config(
|
|
"sqlite_connect_timeout_sec", self.SQLITE_CONNECT_TIMEOUT_SEC
|
|
)
|
|
self.requeue_put_timeout_sec = self._get_float_config(
|
|
"requeue_put_timeout_sec", self.REQUEUE_PUT_TIMEOUT_SEC
|
|
)
|
|
self.shutdown_join_timeout_sec = self._get_float_config(
|
|
"integration_shutdown_join_timeout_sec",
|
|
self.SHUTDOWN_JOIN_TIMEOUT_SEC,
|
|
)
|
|
|
|
def _init_http_session(self):
|
|
"""Initialize a requests.Session with connection pooling and keep-alive"""
|
|
try:
|
|
import logging
|
|
|
|
import requests
|
|
import urllib3
|
|
from requests.adapters import HTTPAdapter
|
|
from urllib3.util.retry import Retry
|
|
|
|
# Suppress urllib3 connection pool messages when web viewer is unreachable
|
|
# Connection refused / Retrying WARNINGs would flood logs during routing bursts
|
|
urllib3_logger = logging.getLogger('urllib3.connectionpool')
|
|
urllib3_logger.setLevel(logging.ERROR)
|
|
|
|
# Also disable other urllib3 warnings
|
|
urllib3.disable_warnings(urllib3.exceptions.NotOpenSSLWarning)
|
|
|
|
self.http_session = requests.Session()
|
|
|
|
# Configure retry strategy
|
|
retry_strategy = Retry(
|
|
total=2,
|
|
backoff_factor=0.1,
|
|
status_forcelist=[429, 500, 502, 503, 504],
|
|
)
|
|
|
|
# Mount adapter with connection pooling
|
|
# pool_block=False allows non-blocking behavior if pool is full
|
|
adapter = HTTPAdapter(
|
|
pool_connections=1, # Single connection pool for web viewer
|
|
pool_maxsize=5, # Allow up to 5 connections in the pool
|
|
max_retries=retry_strategy,
|
|
pool_block=False # Don't block if pool is full
|
|
)
|
|
self.http_session.mount("http://", adapter)
|
|
self.http_session.mount("https://", adapter)
|
|
|
|
# Set default headers for keep-alive and internal auth
|
|
self.http_session.headers.update({
|
|
'Connection': 'keep-alive',
|
|
'X-Requested-With': 'BotIntegration', # CSRF bypass for internal calls
|
|
})
|
|
except ImportError:
|
|
# Fallback if requests is not available
|
|
self.http_session = None
|
|
except Exception as e:
|
|
self.bot.logger.debug(f"Error initializing HTTP session: {e}")
|
|
self.http_session = None
|
|
|
|
def reset_circuit_breaker(self):
|
|
"""Reset the circuit breaker"""
|
|
self.circuit_breaker_open = False
|
|
self.circuit_breaker_failures = 0
|
|
|
|
def _should_skip_web_viewer_send(self):
|
|
"""Return True if we should skip sending (circuit open and within cooldown)."""
|
|
if not self.circuit_breaker_open:
|
|
return False
|
|
if (time.time() - self.circuit_breaker_last_failure_time) >= self.CIRCUIT_BREAKER_COOLDOWN_SEC:
|
|
self.reset_circuit_breaker()
|
|
return False
|
|
return True
|
|
|
|
def _record_web_viewer_result(self, success):
|
|
"""Update circuit breaker state after a send attempt."""
|
|
if success:
|
|
self.reset_circuit_breaker()
|
|
else:
|
|
self.circuit_breaker_failures += 1
|
|
self.circuit_breaker_last_failure_time = time.time()
|
|
if self.circuit_breaker_failures >= self.CIRCUIT_BREAKER_THRESHOLD:
|
|
self.circuit_breaker_open = True
|
|
self.bot.logger.debug(
|
|
"Web viewer unreachable after %d failures; circuit open for %ds",
|
|
self.circuit_breaker_failures,
|
|
self.CIRCUIT_BREAKER_COOLDOWN_SEC,
|
|
)
|
|
|
|
def _get_web_viewer_db_path(self):
|
|
"""Return resolved database path for web viewer. Uses [Bot] db_path when [Web_Viewer] db_path is unset."""
|
|
base_dir = self.bot.bot_root if hasattr(self.bot, 'bot_root') else '.'
|
|
if self.bot.config.has_section('Web_Viewer') and self.bot.config.has_option('Web_Viewer', 'db_path'):
|
|
raw = self.bot.config.get('Web_Viewer', 'db_path', fallback='').strip()
|
|
if raw:
|
|
return resolve_path(raw, base_dir)
|
|
return str(Path(self.bot.db_manager.db_path).resolve())
|
|
|
|
def _init_packet_stream_table(self):
|
|
"""Backward-compatible initializer (now handled by migrations).
|
|
|
|
Kept for older call sites and tests that patch this method. Safe to call
|
|
multiple times and safe to ignore failures.
|
|
"""
|
|
try:
|
|
import sqlite3
|
|
|
|
db_path = self._get_web_viewer_db_path()
|
|
with closing(sqlite3.connect(str(db_path), timeout=self.sqlite_connect_timeout_sec)) as conn:
|
|
cur = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='packet_stream'"
|
|
)
|
|
if cur.fetchone() is not None:
|
|
return
|
|
try:
|
|
foreign_keys = self.bot.config.getboolean("Web_Viewer", "sqlite_foreign_keys", fallback=True)
|
|
busy_timeout_ms = self.bot.config.getint("Web_Viewer", "sqlite_busy_timeout_ms", fallback=60000)
|
|
journal_mode = self.bot.config.get("Web_Viewer", "sqlite_journal_mode", fallback="WAL").strip() or "WAL"
|
|
except Exception:
|
|
foreign_keys = True
|
|
busy_timeout_ms = 60000
|
|
journal_mode = "WAL"
|
|
conn.execute(f"PRAGMA foreign_keys={'ON' if foreign_keys else 'OFF'}")
|
|
conn.execute(f"PRAGMA busy_timeout={int(busy_timeout_ms)}")
|
|
try:
|
|
conn.execute(f"PRAGMA journal_mode={journal_mode}")
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS packet_stream (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp REAL NOT NULL,
|
|
data TEXT NOT NULL,
|
|
type TEXT NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_packet_stream_timestamp ON packet_stream(timestamp)"
|
|
)
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_packet_stream_type ON packet_stream(type)"
|
|
)
|
|
conn.commit()
|
|
except Exception:
|
|
pass
|
|
|
|
def _start_drain_thread(self) -> None:
|
|
"""Start the background thread that flushes the write queue every DRAIN_INTERVAL seconds."""
|
|
self._drain_thread = threading.Thread(
|
|
target=self._drain_loop,
|
|
name="packet-stream-drain",
|
|
daemon=True,
|
|
)
|
|
self._drain_thread.start()
|
|
|
|
def _drain_loop(self) -> None:
|
|
"""Background loop: flush the write queue every DRAIN_INTERVAL seconds."""
|
|
while not self._drain_stop.is_set():
|
|
self._drain_stop.wait(timeout=self.DRAIN_INTERVAL)
|
|
self._flush_write_queue()
|
|
|
|
def _requeue_rows(self, rows: list[tuple[float, str, str]]) -> None:
|
|
"""Restore rows to the queue after a failed flush (FIFO). Logs if the queue stays full."""
|
|
for i, row in enumerate(rows):
|
|
try:
|
|
self._write_queue.put(row, timeout=self.requeue_put_timeout_sec)
|
|
except queue.Full:
|
|
remaining = len(rows) - i
|
|
self.bot.logger.error(
|
|
"packet_stream: could not re-queue %d row(s) after flush failure; data may be lost",
|
|
remaining,
|
|
)
|
|
break
|
|
|
|
def _flush_write_queue(self) -> None:
|
|
"""Drain all queued rows and insert them in a single batched transaction."""
|
|
import sqlite3
|
|
|
|
with self._flush_lock:
|
|
if self._write_queue.empty():
|
|
return
|
|
rows: list[tuple[float, str, str]] = []
|
|
while not self._write_queue.empty():
|
|
try:
|
|
rows.append(self._write_queue.get_nowait())
|
|
except queue.Empty:
|
|
break
|
|
if not rows:
|
|
return
|
|
db_path = self._get_web_viewer_db_path()
|
|
max_retries = 3
|
|
for attempt in range(max_retries):
|
|
try:
|
|
with closing(sqlite3.connect(str(db_path), timeout=self.sqlite_connect_timeout_sec)) as conn:
|
|
conn.executemany(
|
|
'INSERT INTO packet_stream (timestamp, data, type) VALUES (?, ?, ?)',
|
|
rows,
|
|
)
|
|
conn.commit()
|
|
return
|
|
except sqlite3.OperationalError as e:
|
|
if "locked" in str(e).lower() and attempt < max_retries - 1:
|
|
time.sleep(0.15 * (attempt + 1))
|
|
continue
|
|
self.bot.logger.warning(
|
|
f"Error flushing packet_stream queue ({len(rows)} rows): {e}"
|
|
)
|
|
self._requeue_rows(rows)
|
|
return
|
|
except Exception as e:
|
|
self.bot.logger.warning(
|
|
f"Error flushing packet_stream queue ({len(rows)} rows): {e}"
|
|
)
|
|
self._requeue_rows(rows)
|
|
return
|
|
|
|
def _insert_packet_stream_row(self, data_json: str, row_type: str, log_prefix: str = "packet data"):
|
|
"""Queue one row for batched insertion into packet_stream by the drain thread."""
|
|
item = (time.time(), data_json, row_type)
|
|
try:
|
|
self._write_queue.put(item, timeout=self._WRITE_QUEUE_PUT_TIMEOUT_SEC)
|
|
except queue.Full:
|
|
try:
|
|
self._flush_write_queue()
|
|
except Exception as e:
|
|
self.bot.logger.debug("packet_stream flush after full queue: %s", e)
|
|
try:
|
|
self._write_queue.put(item, timeout=self._WRITE_QUEUE_PUT_TIMEOUT_SEC)
|
|
except queue.Full:
|
|
self.bot.logger.warning(
|
|
"packet_stream write queue full (%s/%s items) after flush retry; dropping %s",
|
|
self._write_queue.qsize(),
|
|
self._write_queue.maxsize,
|
|
log_prefix,
|
|
)
|
|
except Exception as e:
|
|
self.bot.logger.warning(f"Error queuing {log_prefix} for web viewer: {e}")
|
|
|
|
def capture_full_packet_data(self, packet_data):
|
|
"""Capture full packet data and store in database for web viewer"""
|
|
try:
|
|
import json
|
|
from datetime import datetime
|
|
|
|
# Ensure packet_data is a dict (might be passed as dict already)
|
|
if not isinstance(packet_data, dict):
|
|
packet_data = self._make_json_serializable(packet_data)
|
|
if not isinstance(packet_data, dict):
|
|
# If still not a dict, wrap it
|
|
packet_data = {'data': packet_data}
|
|
|
|
# Add hops field from path_len if not already present
|
|
# path_len represents the number of hops (each byte = 1 hop)
|
|
if 'hops' not in packet_data and 'path_len' in packet_data:
|
|
packet_data['hops'] = packet_data.get('path_len', 0)
|
|
elif 'hops' not in packet_data:
|
|
# If no path_len either, default to 0 hops
|
|
packet_data['hops'] = 0
|
|
|
|
# Add datetime for frontend display
|
|
if 'datetime' not in packet_data:
|
|
packet_data['datetime'] = datetime.now().isoformat()
|
|
|
|
# Convert non-serializable objects to strings
|
|
serializable_data = self._make_json_serializable(packet_data)
|
|
|
|
# Store in database for web viewer to read (retries on database is locked)
|
|
self._insert_packet_stream_row(json.dumps(serializable_data), 'packet', "packet data")
|
|
|
|
except Exception as e:
|
|
self.bot.logger.warning(f"Error storing packet data for web viewer: {e}")
|
|
|
|
def capture_command(self, message, command_name, response, success, command_id=None):
|
|
"""Capture command data and store in database for web viewer"""
|
|
try:
|
|
import json
|
|
import time
|
|
|
|
# Extract data from message object
|
|
user = getattr(message, 'sender_id', 'Unknown')
|
|
channel = getattr(message, 'channel', 'Unknown')
|
|
user_input = getattr(message, 'content', f'/{command_name}')
|
|
|
|
# Get repeat information if transmission tracker is available
|
|
repeat_count = 0
|
|
repeater_prefixes = []
|
|
repeater_counts = {}
|
|
if (hasattr(self.bot, 'transmission_tracker') and
|
|
self.bot.transmission_tracker and
|
|
command_id):
|
|
repeat_info = self.bot.transmission_tracker.get_repeat_info(command_id=command_id)
|
|
repeat_count = repeat_info.get('repeat_count', 0)
|
|
repeater_prefixes = repeat_info.get('repeater_prefixes', [])
|
|
repeater_counts = repeat_info.get('repeater_counts', {})
|
|
|
|
# Construct command data structure
|
|
command_data = {
|
|
'user': user,
|
|
'channel': channel,
|
|
'command': command_name,
|
|
'user_input': user_input,
|
|
'response': response,
|
|
'success': success,
|
|
'timestamp': time.time(),
|
|
'repeat_count': repeat_count,
|
|
'repeater_prefixes': repeater_prefixes,
|
|
'repeater_counts': repeater_counts, # Count per repeater prefix
|
|
'command_id': command_id # Store command_id for later updates
|
|
}
|
|
|
|
# Convert non-serializable objects to strings
|
|
serializable_data = self._make_json_serializable(command_data)
|
|
|
|
# Store in database for web viewer to read (retries on database is locked)
|
|
self._insert_packet_stream_row(json.dumps(serializable_data), 'command', "command data")
|
|
|
|
except Exception as e:
|
|
self.bot.logger.debug(f"Error storing command data: {e}")
|
|
|
|
def capture_channel_message(self, message) -> None:
|
|
"""Capture an incoming channel or DM message for the web viewer live monitor."""
|
|
try:
|
|
import json
|
|
import time
|
|
data = {
|
|
'type': 'message',
|
|
'timestamp': time.time(),
|
|
'sender': getattr(message, 'sender_id', ''),
|
|
'channel': getattr(message, 'channel', ''),
|
|
'content': getattr(message, 'content', ''),
|
|
'snr': str(getattr(message, 'snr', '')),
|
|
'hops': getattr(message, 'hops', None),
|
|
'path': getattr(message, 'path', ''),
|
|
'is_dm': bool(getattr(message, 'is_dm', False)),
|
|
}
|
|
self._insert_packet_stream_row(json.dumps(data), 'message', "channel message")
|
|
except Exception as e:
|
|
self.bot.logger.debug(f"Error storing channel message for web viewer: {e}")
|
|
|
|
def capture_packet_routing(self, routing_data):
|
|
"""Capture packet routing data and store in database for web viewer"""
|
|
try:
|
|
import json
|
|
|
|
# Convert non-serializable objects to strings
|
|
serializable_data = self._make_json_serializable(routing_data)
|
|
|
|
# Store in database for web viewer to read (retries on database is locked)
|
|
self._insert_packet_stream_row(json.dumps(serializable_data), 'routing', "routing data")
|
|
|
|
except Exception as e:
|
|
self.bot.logger.debug(f"Error storing routing data: {e}")
|
|
|
|
def cleanup_old_data(self, days_to_keep: Optional[int] = None):
|
|
"""Clean up old packet stream data to prevent database bloat.
|
|
Uses [Data_Retention] packet_stream_retention_days when days_to_keep is not provided."""
|
|
try:
|
|
import sqlite3
|
|
import time
|
|
|
|
if days_to_keep is None:
|
|
days_to_keep = 3
|
|
if self.bot.config.has_section('Data_Retention') and self.bot.config.has_option('Data_Retention', 'packet_stream_retention_days'):
|
|
with suppress(ValueError, TypeError):
|
|
days_to_keep = self.bot.config.getint('Data_Retention', 'packet_stream_retention_days')
|
|
|
|
cutoff_time = time.time() - (days_to_keep * 24 * 60 * 60)
|
|
|
|
db_path = self._get_web_viewer_db_path()
|
|
with closing(sqlite3.connect(str(db_path), timeout=self.sqlite_connect_timeout_sec)) as conn:
|
|
cursor = conn.cursor()
|
|
|
|
# Clean up old packet stream data
|
|
cursor.execute('DELETE FROM packet_stream WHERE timestamp < ?', (cutoff_time,))
|
|
deleted_count = cursor.rowcount
|
|
|
|
conn.commit()
|
|
|
|
if deleted_count > 0:
|
|
self.bot.logger.info(f"Cleaned up {deleted_count} old packet stream entries (older than {days_to_keep} days)")
|
|
|
|
except Exception as e:
|
|
self.bot.logger.error(f"Error cleaning up old packet stream data: {e}")
|
|
|
|
def _make_json_serializable(self, obj, depth=0, max_depth=3):
|
|
"""Convert non-JSON-serializable objects to strings with depth limiting"""
|
|
if depth > max_depth:
|
|
return str(obj)
|
|
|
|
# Handle basic types first
|
|
if obj is None or isinstance(obj, (str, int, float, bool)):
|
|
return obj
|
|
elif isinstance(obj, (list, tuple)):
|
|
return [self._make_json_serializable(item, depth + 1) for item in obj]
|
|
elif isinstance(obj, dict):
|
|
return {k: self._make_json_serializable(v, depth + 1) for k, v in obj.items()}
|
|
elif hasattr(obj, 'name'): # Enum-like objects
|
|
return obj.name
|
|
elif hasattr(obj, 'value'): # Enum values
|
|
return obj.value
|
|
elif hasattr(obj, '__dict__'):
|
|
# Convert objects to dict, but limit depth
|
|
try:
|
|
return {k: self._make_json_serializable(v, depth + 1) for k, v in obj.__dict__.items()}
|
|
except (RecursionError, RuntimeError):
|
|
return str(obj)
|
|
else:
|
|
return str(obj)
|
|
|
|
def send_mesh_edge_update(self, edge_data):
|
|
"""Send mesh edge update to web viewer via HTTP API"""
|
|
try:
|
|
if self._should_skip_web_viewer_send():
|
|
return
|
|
# Get web viewer URL from config
|
|
host = self.bot.config.get('Web_Viewer', 'host', fallback='127.0.0.1')
|
|
port = self.bot.config.getint('Web_Viewer', 'port', fallback=8080)
|
|
url = f"http://{host}:{port}/api/stream_data"
|
|
|
|
payload = {
|
|
'type': 'mesh_edge',
|
|
'data': edge_data
|
|
}
|
|
|
|
# Use session with connection pooling if available, otherwise fallback to requests.post
|
|
headers = {
|
|
'X-Stream-Token': self._stream_token,
|
|
'X-Requested-With': 'BotIntegration',
|
|
}
|
|
if self.http_session:
|
|
try:
|
|
self.http_session.post(url, json=payload, timeout=self.edge_post_timeout_sec)
|
|
self._record_web_viewer_result(True)
|
|
except Exception:
|
|
self._record_web_viewer_result(False)
|
|
else:
|
|
import requests
|
|
try:
|
|
requests.post(url, json=payload, timeout=self.edge_post_timeout_sec, headers=headers)
|
|
self._record_web_viewer_result(True)
|
|
except Exception:
|
|
self._record_web_viewer_result(False)
|
|
except Exception as e:
|
|
self.bot.logger.debug(f"Error sending mesh edge update to web viewer: {e}")
|
|
|
|
def send_mesh_node_update(self, node_data):
|
|
"""Send mesh node update to web viewer via HTTP API"""
|
|
try:
|
|
if self._should_skip_web_viewer_send():
|
|
return
|
|
import requests
|
|
|
|
host = self.bot.config.get('Web_Viewer', 'host', fallback='127.0.0.1')
|
|
port = self.bot.config.getint('Web_Viewer', 'port', fallback=8080)
|
|
url = f"http://{host}:{port}/api/stream_data"
|
|
|
|
payload = {
|
|
'type': 'mesh_node',
|
|
'data': node_data
|
|
}
|
|
|
|
headers = {
|
|
'X-Stream-Token': self._stream_token,
|
|
'X-Requested-With': 'BotIntegration',
|
|
}
|
|
try:
|
|
requests.post(url, json=payload, timeout=self.node_post_timeout_sec, headers=headers)
|
|
self._record_web_viewer_result(True)
|
|
except Exception:
|
|
self._record_web_viewer_result(False)
|
|
except Exception as e:
|
|
self.bot.logger.debug(f"Error sending mesh node update to web viewer: {e}")
|
|
|
|
def shutdown(self):
|
|
"""Mark as shutting down, stop drain thread, flush remaining rows, and close HTTP session."""
|
|
self.is_shutting_down = True
|
|
# Stop drain thread and do a final flush of any queued rows
|
|
self._drain_stop.set()
|
|
if self._drain_thread and self._drain_thread.is_alive():
|
|
self._drain_thread.join(timeout=self.shutdown_join_timeout_sec)
|
|
self._flush_write_queue()
|
|
# Close HTTP session to clean up connections
|
|
if hasattr(self, 'http_session') and self.http_session:
|
|
with suppress(Exception):
|
|
self.http_session.close()
|
|
|
|
class WebViewerIntegration:
|
|
"""Integration class for starting/stopping the web viewer with the bot"""
|
|
|
|
# Whitelist of allowed host bindings for security
|
|
ALLOWED_HOSTS = ['127.0.0.1', 'localhost', '0.0.0.0']
|
|
VIEWER_STOP_GRACE_TIMEOUT_SEC = 5.0
|
|
VIEWER_STOP_FORCE_TIMEOUT_SEC = 2.0
|
|
|
|
def __init__(self, bot):
|
|
self.bot = bot
|
|
self.logger = bot.logger
|
|
self.viewer_process = None
|
|
self.viewer_thread = None
|
|
self.running = False
|
|
|
|
# Get web viewer settings from config
|
|
self.enabled = bot.config.getboolean('Web_Viewer', 'enabled', fallback=False)
|
|
self.host = bot.config.get('Web_Viewer', 'host', fallback='127.0.0.1')
|
|
self.port = bot.config.getint('Web_Viewer', 'port', fallback=8080) # Web viewer uses 8080
|
|
self.debug = bot.config.getboolean('Web_Viewer', 'debug', fallback=False)
|
|
self.auto_start = bot.config.getboolean('Web_Viewer', 'auto_start', fallback=False)
|
|
self.viewer_stop_grace_timeout_sec = self._get_float_config(
|
|
"viewer_stop_grace_timeout_sec",
|
|
self.VIEWER_STOP_GRACE_TIMEOUT_SEC,
|
|
)
|
|
self.viewer_stop_force_timeout_sec = self._get_float_config(
|
|
"viewer_stop_force_timeout_sec",
|
|
self.VIEWER_STOP_FORCE_TIMEOUT_SEC,
|
|
)
|
|
|
|
# Validate configuration for security
|
|
self._validate_config()
|
|
|
|
# Process monitoring
|
|
self.restart_count = 0
|
|
self.max_restarts = 5
|
|
self.last_restart = 0
|
|
# True while stop_viewer() is tearing down; blocks accidental restart during bot shutdown
|
|
self.shutting_down = False
|
|
|
|
# Initialize bot integration for compatibility
|
|
self.bot_integration = BotIntegration(bot)
|
|
|
|
if self.enabled and self.auto_start:
|
|
self.start_viewer()
|
|
|
|
def _get_float_config(self, key: str, fallback: float) -> float:
|
|
"""Read float timeout from [Web_Viewer] with positive-value guard."""
|
|
try:
|
|
value = self.bot.config.getfloat("Web_Viewer", key, fallback=fallback)
|
|
return value if value > 0 else fallback
|
|
except (ValueError, TypeError, OSError):
|
|
return fallback
|
|
|
|
def _validate_config(self):
|
|
"""Validate web viewer configuration for security"""
|
|
# Validate host against whitelist
|
|
if self.host not in self.ALLOWED_HOSTS:
|
|
raise ValueError(
|
|
f"Invalid host configuration: {self.host}. "
|
|
f"Allowed hosts: {', '.join(self.ALLOWED_HOSTS)}"
|
|
)
|
|
|
|
# Validate port range (avoid privileged ports)
|
|
if not isinstance(self.port, int) or not (1024 <= self.port <= 65535):
|
|
raise ValueError(
|
|
f"Port must be between 1024-65535 (non-privileged), got: {self.port}"
|
|
)
|
|
|
|
# Insecure but allowed: binding to all interfaces without a password.
|
|
# Only warn when the web viewer is configured to run at startup.
|
|
if self.enabled and self.host == "0.0.0.0":
|
|
if not normalized_web_viewer_password(self.bot.config):
|
|
self.logger.error(
|
|
"Web viewer is configured with host = 0.0.0.0 and no "
|
|
"web_viewer_password (or password is empty/null); the UI is reachable "
|
|
"from the network without authentication. Set web_viewer_password or use "
|
|
"host = 127.0.0.1 for local-only access."
|
|
)
|
|
|
|
def start_viewer(self):
|
|
"""Start the web viewer in a separate thread"""
|
|
if self.running:
|
|
self.logger.warning("Web viewer is already running")
|
|
return
|
|
|
|
# Intentional (re)start after stop_viewer / restart_viewer — allow monitor thread to run
|
|
self.shutting_down = False
|
|
|
|
try:
|
|
# Start the web viewer
|
|
self.viewer_thread = threading.Thread(target=self._run_viewer, daemon=True)
|
|
self.viewer_thread.start()
|
|
self.running = True
|
|
self.logger.info(f"Web viewer started on http://{self.host}:{self.port}")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to start web viewer: {e}")
|
|
|
|
def stop_viewer(self):
|
|
"""Stop the web viewer process."""
|
|
if not self.running and not self.viewer_process:
|
|
return
|
|
|
|
try:
|
|
self.shutting_down = True
|
|
self.running = False
|
|
|
|
if self.viewer_process and self.viewer_process.is_alive():
|
|
self.logger.info("Stopping web viewer...")
|
|
self.viewer_process.terminate()
|
|
self.viewer_process.join(timeout=self.viewer_stop_grace_timeout_sec)
|
|
if self.viewer_process.is_alive():
|
|
self.logger.warning("Web viewer did not stop gracefully, forcing termination")
|
|
self.viewer_process.kill()
|
|
self.viewer_process.join(timeout=self.viewer_stop_force_timeout_sec)
|
|
self.logger.info("Web viewer stopped")
|
|
else:
|
|
self.logger.info("Web viewer already stopped")
|
|
|
|
self.viewer_process = None
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error stopping web viewer: {e}")
|
|
|
|
def _run_viewer(self):
|
|
"""Run the web viewer as a child process via multiprocessing."""
|
|
try:
|
|
config_path = getattr(self.bot, 'config_file', 'config.ini')
|
|
config_path = str(Path(config_path).resolve()) if config_path else 'config.ini'
|
|
|
|
self.viewer_process = multiprocessing.Process(
|
|
target=_run_viewer_process,
|
|
kwargs={'config': config_path, 'host': self.host, 'port': self.port, 'debug': self.debug},
|
|
name='web-viewer',
|
|
daemon=True,
|
|
)
|
|
self.viewer_process.start()
|
|
|
|
# Brief pause to detect immediate startup failures
|
|
time.sleep(2)
|
|
|
|
if not self.viewer_process.is_alive():
|
|
self.logger.error(
|
|
"Web viewer failed to start (exit code %s)", self.viewer_process.exitcode
|
|
)
|
|
self.viewer_process = None
|
|
return
|
|
|
|
self.logger.info("Web viewer integration ready for data streaming")
|
|
|
|
while self.running and self.viewer_process and self.viewer_process.is_alive():
|
|
time.sleep(1)
|
|
|
|
if self.running and self.viewer_process and not self.viewer_process.is_alive():
|
|
if (
|
|
self.shutting_down
|
|
or self.bot._shutdown_event.is_set()
|
|
or not getattr(self.bot, "connected", True)
|
|
):
|
|
self.logger.debug(
|
|
"Web viewer process exited (code %s) during bot shutdown; not restarting",
|
|
self.viewer_process.exitcode,
|
|
)
|
|
return
|
|
self.logger.warning(
|
|
"Web viewer process exited unexpectedly (code %s) — attempting restart",
|
|
self.viewer_process.exitcode,
|
|
)
|
|
self.restart_viewer()
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error running web viewer: {e}")
|
|
finally:
|
|
self.running = False
|
|
|
|
def get_status(self):
|
|
"""Get the current status of the web viewer"""
|
|
return {
|
|
'enabled': self.enabled,
|
|
'running': self.running,
|
|
'host': self.host,
|
|
'port': self.port,
|
|
'debug': self.debug,
|
|
'auto_start': self.auto_start,
|
|
'url': f"http://{self.host}:{self.port}" if self.running else None
|
|
}
|
|
|
|
def restart_viewer(self):
|
|
"""Restart the web viewer with rate limiting"""
|
|
if self.shutting_down:
|
|
return
|
|
if getattr(self.bot, "_shutdown_event", None) is not None and self.bot._shutdown_event.is_set():
|
|
return
|
|
if not getattr(self.bot, "connected", True):
|
|
return
|
|
|
|
current_time = time.time()
|
|
|
|
# Rate limit restarts to prevent restart loops
|
|
if current_time - self.last_restart < 30: # 30 seconds between restarts
|
|
self.logger.warning("Restart rate limited - too soon since last restart")
|
|
return
|
|
|
|
if self.restart_count >= self.max_restarts:
|
|
self.logger.error(f"Maximum restart limit reached ({self.max_restarts}). Web viewer disabled.")
|
|
self.enabled = False
|
|
return
|
|
|
|
self.restart_count += 1
|
|
self.last_restart = current_time
|
|
|
|
self.logger.info(f"Restarting web viewer (attempt {self.restart_count}/{self.max_restarts})...")
|
|
self.stop_viewer()
|
|
time.sleep(3) # Give it more time to stop
|
|
|
|
self.start_viewer()
|
|
|
|
def is_viewer_healthy(self):
|
|
"""Check if the web viewer process is healthy"""
|
|
if not self.viewer_process:
|
|
return False
|
|
return self.viewer_process.is_alive()
|