diff --git a/modules/core.py b/modules/core.py index 85edd09..dca5d82 100644 --- a/modules/core.py +++ b/modules/core.py @@ -1517,7 +1517,7 @@ class MeshCoreBot: # Check if process died if (self.web_viewer_integration and self.web_viewer_integration.viewer_process and - not self.web_viewer_integration.viewer_process.is_alive()): + self.web_viewer_integration.viewer_process.poll() is not None): try: self.logger.warning("Web viewer process died, restarting...") except (AttributeError, TypeError): diff --git a/pyproject.toml b/pyproject.toml index 3bfefec..b2c9678 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,8 @@ dependencies = [ "retry-requests>=1.0.0", "flask>=2.3.0", "flask-socketio>=5.3.0", + "gevent>=24.2.1", + "gevent-websocket>=0.10.1", "meshcore-cli", "feedparser>=6.0.10", "paho-mqtt>=1.6.0", @@ -46,7 +48,7 @@ docs = ["mkdocs-material>=9.0.0", "mkdocs-exclude>=1.0.0"] [project.scripts] meshcore-bot = "meshcore_bot:main" -meshcore-viewer = "web_viewer.app:main" +meshcore-viewer = "web_viewer._launcher:_main" [tool.setuptools] # Include both the main module and the modules package diff --git a/tests/test_web_viewer.py b/tests/test_web_viewer.py index cc80484..b25a602 100644 --- a/tests/test_web_viewer.py +++ b/tests/test_web_viewer.py @@ -1454,153 +1454,6 @@ class TestRateLimiterStatsRoute: # Werkzeug WebSocket compatibility patch # --------------------------------------------------------------------------- -class TestWerkzeugWebSocketFix: - """_apply_werkzeug_websocket_fix patches SimpleWebSocketWSGI.__call__ so - that Werkzeug's write() before start_response assertion is never raised - when a WebSocket session ends normally.""" - - def test_patch_is_applied_at_module_import(self): - """SimpleWebSocketWSGI.__call__ should be our patched wrapper after - importing app.py (which calls _apply_werkzeug_websocket_fix at import - time).""" - from engineio.async_drivers import _websocket_wsgi - # The patch wraps __call__; the closure name reflects the patch. - assert _websocket_wsgi.SimpleWebSocketWSGI.__call__.__name__ == '_patched_call' - - def test_patch_calls_start_response_after_handler(self): - """After the underlying __call__ returns, the patch must invoke - start_response so that status_set is not None when Werkzeug's - write(b'') runs.""" - from engineio.async_drivers import _websocket_wsgi - - sr_calls = [] - - def fake_start_response(status, headers, exc_info=None): - sr_calls.append((status, headers)) - return lambda data: None - - # Build a minimal mock SimpleWebSocketWSGI instance where __call__ - # returns [] (as _websocket_handler does on teardown). - from unittest.mock import MagicMock - from unittest.mock import patch as mock_patch - - ws_instance = MagicMock(spec=_websocket_wsgi.SimpleWebSocketWSGI) - - # Temporarily restore a fake "original" __call__ that returns [] - with mock_patch.object( - _websocket_wsgi.SimpleWebSocketWSGI, - '__call__', - new=_websocket_wsgi.SimpleWebSocketWSGI.__call__, - ): - # The real patched __call__ is already in place; call it with a - # mock "inner" that returns [] without calling start_response. - from web_viewer.app import _apply_werkzeug_websocket_fix - - captured = {} - - def mock_orig(self, environ, start_response): - captured['sr_called'] = False - return [] - - orig = _websocket_wsgi.SimpleWebSocketWSGI.__call__ - _websocket_wsgi.SimpleWebSocketWSGI.__call__ = mock_orig - try: - _apply_werkzeug_websocket_fix() - result = _websocket_wsgi.SimpleWebSocketWSGI.__call__( - ws_instance, {}, fake_start_response - ) - finally: - _websocket_wsgi.SimpleWebSocketWSGI.__call__ = orig - - assert result == [] - # start_response must have been called by the patch - assert len(sr_calls) == 1 - assert sr_calls[0][0] == '200 OK' - - def test_patch_tolerates_start_response_already_called(self): - """If start_response was already called (e.g. error path), the patch - must not propagate the 'Headers already set' AssertionError.""" - from unittest.mock import MagicMock - - from engineio.async_drivers import _websocket_wsgi - - from web_viewer.app import _apply_werkzeug_websocket_fix - - call_count = [0] - - def raises_on_second(status, headers, exc_info=None): - call_count[0] += 1 - if call_count[0] > 1: - raise AssertionError("Headers already set") - return lambda data: None - - ws_instance = MagicMock(spec=_websocket_wsgi.SimpleWebSocketWSGI) - - def mock_orig_already_called(self, environ, start_response): - start_response('500 INTERNAL SERVER ERROR', []) - return [] - - orig = _websocket_wsgi.SimpleWebSocketWSGI.__call__ - _websocket_wsgi.SimpleWebSocketWSGI.__call__ = mock_orig_already_called - try: - _apply_werkzeug_websocket_fix() - # Must not raise even though start_response throws on second call - result = _websocket_wsgi.SimpleWebSocketWSGI.__call__( - ws_instance, {}, raises_on_second - ) - finally: - _websocket_wsgi.SimpleWebSocketWSGI.__call__ = orig - - assert result == [] - - def test_patch_is_idempotent(self): - """Calling _apply_werkzeug_websocket_fix() twice must not double-wrap - and must leave the patched callable working correctly.""" - from engineio.async_drivers import _websocket_wsgi - - from web_viewer.app import _apply_werkzeug_websocket_fix - - sr_calls = [] - - def fake_sr(status, headers, exc_info=None): - sr_calls.append(status) - return lambda data: None - - # Apply a second time - _apply_werkzeug_websocket_fix() - - from unittest.mock import MagicMock - ws_instance = MagicMock(spec=_websocket_wsgi.SimpleWebSocketWSGI) - - # Temporarily replace the inner with a simple stub - def stub_orig(self, environ, start_response): - return [] - - orig = _websocket_wsgi.SimpleWebSocketWSGI.__call__ - _websocket_wsgi.SimpleWebSocketWSGI.__call__ = stub_orig - try: - _apply_werkzeug_websocket_fix() - _websocket_wsgi.SimpleWebSocketWSGI.__call__( - ws_instance, {}, fake_sr - ) - finally: - _websocket_wsgi.SimpleWebSocketWSGI.__call__ = orig - - # Exactly one start_response call regardless of how many times patch applied - assert len(sr_calls) == 1 - - def test_patch_handles_missing_engineio(self): - """_apply_werkzeug_websocket_fix must not raise if engineio is absent.""" - import sys - from unittest.mock import patch as mock_patch - - from web_viewer.app import _apply_werkzeug_websocket_fix - - with mock_patch.dict(sys.modules, {'engineio.async_drivers._websocket_wsgi': None}): - # Should be a no-op, not raise - _apply_werkzeug_websocket_fix() - - # =========================================================================== # TASK-01: Radio page — firmware config + reboot UI removed # =========================================================================== diff --git a/web_viewer/_launcher.py b/web_viewer/_launcher.py new file mode 100644 index 0000000..672d593 --- /dev/null +++ b/web_viewer/_launcher.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python3 +""" +Eventlet entry point for the web viewer subprocess. + +eventlet.monkey_patch() MUST run before any other import so that +eventlet can replace stdlib sockets/threads cleanly. This module +exists solely to enforce that ordering — it is invoked via +subprocess.Popen, never imported by the bot process. +""" +import gevent.monkey # noqa: E402 — must be first +gevent.monkey.patch_all() # noqa: E402 — must be before all other imports + +import argparse +import sys +from pathlib import Path + +# Ensure project root is on sys.path when run as a subprocess script. +_root = str(Path(__file__).resolve().parent.parent) +if _root not in sys.path: + sys.path.insert(0, _root) + +from web_viewer.app import BotDataViewer + + +def _main() -> None: + parser = argparse.ArgumentParser(description='MeshCore Bot Data Viewer') + parser.add_argument('--config', default='config.ini') + parser.add_argument('--host', default='127.0.0.1') + parser.add_argument('--port', type=int, default=8080) + parser.add_argument('--debug', action='store_true') + args = parser.parse_args() + viewer = BotDataViewer(config_path=args.config) + viewer.run(host=args.host, port=args.port, debug=args.debug) + + +if __name__ == '__main__': + _main() diff --git a/web_viewer/app.py b/web_viewer/app.py index 808a557..c9220ed 100644 --- a/web_viewer/app.py +++ b/web_viewer/app.py @@ -51,38 +51,6 @@ from shared.security_utils import ( from modules.version_info import resolve_runtime_version -def _apply_werkzeug_websocket_fix() -> None: - """Patch SimpleWebSocketWSGI to call start_response after WebSocket teardown. - - python-engineio's SimpleWebSocketWSGI.__call__ handles the WebSocket - session directly on the raw socket and returns [] without ever calling - start_response. Werkzeug then calls write(b"") to flush an empty body, - which triggers ``AssertionError: write() before start_response``. - - The fix calls start_response after the handler returns so that status_set - is not None when write(b"") runs. The subsequent attempt to write HTTP - headers to the already-closed socket raises BrokenPipeError, which Werkzeug - classifies as a dropped connection and silently ignores. - """ - try: - from engineio.async_drivers import _websocket_wsgi # noqa: PLC0415 - _orig_call = _websocket_wsgi.SimpleWebSocketWSGI.__call__ - - def _patched_call(self, environ, start_response): # noqa: ANN001 - result = _orig_call(self, environ, start_response) - try: - start_response('200 OK', [('Content-Length', '0')]) - except Exception: # noqa: BLE001 - pass - return result - - _websocket_wsgi.SimpleWebSocketWSGI.__call__ = _patched_call - except (ImportError, AttributeError): - pass - - -_apply_werkzeug_websocket_fix() - # colorlog and other handlers write ANSI SGR sequences; strip for web /logs display _ANSI_ESCAPE_RE = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") @@ -163,7 +131,7 @@ class BotDataViewer: ping_interval=25, # 25 second ping interval (Flask-SocketIO 5.x default) logger=False, # Disable verbose logging engineio_logger=False, # Disable EngineIO logging - async_mode='threading', # Use threading for better stability + async_mode='gevent', ) self.socketio = SocketIO() @@ -288,6 +256,9 @@ class BotDataViewer: # Prevent propagation to root logger to avoid duplicate messages self.logger.propagate = False + # Suppress gevent WSGI access logs (one line per request at INFO level) + logging.getLogger('gevent.pywsgi').setLevel(logging.WARNING) + self.logger.info("Web viewer logging initialized with rotation (5MB max, 3 backups)") def _load_config(self, config_path): @@ -8241,39 +8212,12 @@ class BotDataViewer: def run(self, host='127.0.0.1', port=8080, debug=False): """Run the modern web viewer""" self.logger.info(f"Starting modern web viewer on {host}:{port}") - self._suppress_werkzeug_headers_error() try: - self.socketio.run( - self.app, - host=host, - port=port, - debug=debug, - allow_unsafe_werkzeug=True - ) + self.socketio.run(self.app, host=host, port=port, debug=debug) except Exception as e: self.logger.error(f"Error running web viewer: {e}") raise - @staticmethod - def _suppress_werkzeug_headers_error() -> None: - """Install a log filter that silences the 'Headers already set' AssertionError. - - Werkzeug's dev server catches this internally and continues serving, but it - logs a full traceback at ERROR level. The underlying cause (concurrent - SocketIO polling requests racing through the WSGI layer) is reduced by the - single-socket-per-page fix, but may still occur occasionally. The filter - downgrades these specific records to DEBUG so they don't alarm operators. - """ - import logging - - class _HeadersAlreadySetFilter(logging.Filter): - def filter(self, record: logging.LogRecord) -> bool: - msg = record.getMessage() - return "Headers already set" not in msg - - for name in ("werkzeug", "werkzeug.serving"): - logging.getLogger(name).addFilter(_HeadersAlreadySetFilter()) - def main(): """Entry point for the meshcore-viewer command""" import argparse diff --git a/web_viewer/integration.py b/web_viewer/integration.py index bf2040b..bb52304 100644 --- a/web_viewer/integration.py +++ b/web_viewer/integration.py @@ -4,9 +4,10 @@ Web Viewer Integration for MeshCore Bot Provides integration between the main bot and the web viewer """ -import multiprocessing import queue import secrets +import subprocess +import sys import threading import time from contextlib import closing, suppress @@ -16,13 +17,6 @@ from typing import Optional from modules.utils import resolve_path -def _run_viewer_process(config: str, host: str, port: int, debug: bool) -> None: - """Target for multiprocessing.Process — must live at module level to be picklable.""" - from web_viewer.app import BotDataViewer - viewer = BotDataViewer(config_path=config) - viewer.run(host=host, port=port, debug=debug) - - def normalized_web_viewer_password(config) -> str: """Return the effective web viewer password, or '' to disable the login screen. @@ -713,15 +707,17 @@ class WebViewerIntegration: self.shutting_down = True self.running = False - if self.viewer_process and self.viewer_process.is_alive(): + if self.viewer_process and self.viewer_process.poll() is None: self.logger.info("Stopping web viewer...") self.viewer_process.terminate() - self.viewer_process.join(timeout=self.viewer_stop_grace_timeout_sec) - if self.viewer_process.is_alive(): + try: + self.viewer_process.wait(timeout=self.viewer_stop_grace_timeout_sec) + self.logger.info("Web viewer stopped") + except subprocess.TimeoutExpired: self.logger.warning("Web viewer did not stop gracefully, forcing termination") self.viewer_process.kill() - self.viewer_process.join(timeout=self.viewer_stop_force_timeout_sec) - self.logger.info("Web viewer stopped") + with suppress(subprocess.TimeoutExpired): + self.viewer_process.wait(timeout=self.viewer_stop_force_timeout_sec) else: self.logger.info("Web viewer already stopped") @@ -731,35 +727,37 @@ class WebViewerIntegration: self.logger.error(f"Error stopping web viewer: {e}") def _run_viewer(self): - """Run the web viewer as a child process via multiprocessing.""" + """Run the web viewer via subprocess using the eventlet launcher.""" try: config_path = getattr(self.bot, 'config_file', 'config.ini') config_path = str(Path(config_path).resolve()) if config_path else 'config.ini' - self.viewer_process = multiprocessing.Process( - target=_run_viewer_process, - kwargs={'config': config_path, 'host': self.host, 'port': self.port, 'debug': self.debug}, - name='web-viewer', - daemon=True, - ) - self.viewer_process.start() + launcher = Path(__file__).parent / '_launcher.py' + cmd = [sys.executable, str(launcher), + '--config', config_path, + '--host', self.host, + '--port', str(self.port)] + if self.debug: + cmd.append('--debug') + + self.viewer_process = subprocess.Popen(cmd) # Brief pause to detect immediate startup failures time.sleep(2) - if not self.viewer_process.is_alive(): + if self.viewer_process.poll() is not None: self.logger.error( - "Web viewer failed to start (exit code %s)", self.viewer_process.exitcode + "Web viewer failed to start (exit code %s)", self.viewer_process.returncode ) self.viewer_process = None return self.logger.info("Web viewer integration ready for data streaming") - while self.running and self.viewer_process and self.viewer_process.is_alive(): + while self.running and self.viewer_process and self.viewer_process.poll() is None: time.sleep(1) - if self.running and self.viewer_process and not self.viewer_process.is_alive(): + if self.running and self.viewer_process and self.viewer_process.poll() is not None: if ( self.shutting_down or self.bot._shutdown_event.is_set() @@ -767,12 +765,12 @@ class WebViewerIntegration: ): self.logger.debug( "Web viewer process exited (code %s) during bot shutdown; not restarting", - self.viewer_process.exitcode, + self.viewer_process.returncode, ) return self.logger.warning( "Web viewer process exited unexpectedly (code %s) — attempting restart", - self.viewer_process.exitcode, + self.viewer_process.returncode, ) self.restart_viewer() @@ -827,4 +825,4 @@ class WebViewerIntegration: """Check if the web viewer process is healthy""" if not self.viewer_process: return False - return self.viewer_process.is_alive() + return self.viewer_process.poll() is None