From 9d768a3cf7167a46d77b5f7eb61bb90546f2ded0 Mon Sep 17 00:00:00 2001 From: agessaman Date: Thu, 2 Apr 2026 10:24:49 -0700 Subject: [PATCH] Implement MQTT weather support in weather commands and configuration - Added new configuration options in `config.ini.example` for enabling and configuring MQTT weather sources. - Updated `WxCommand` and `GlobalWxCommand` classes to retrieve weather data from custom MQTT topics, enhancing flexibility in weather data sources. - Introduced error handling for MQTT data retrieval, ensuring robust responses for various error scenarios. - Expanded translation strings in `en.json` to support MQTT-related messages, improving user feedback. These changes enhance the application's capability to utilize MQTT for weather data, providing users with more options for weather information retrieval. --- config.ini.example | 46 +++ modules/clients/mqtt_weather.py | 341 ++++++++++++++++++ .../commands/alternatives/wx_international.py | 68 +++- modules/commands/wx_command.py | 76 +++- .../service_plugins/mqtt_weather_service.py | 191 ++++++++++ tests/unit/test_mqtt_weather.py | 175 +++++++++ translations/en.json | 12 +- 7 files changed, 904 insertions(+), 5 deletions(-) create mode 100644 modules/clients/mqtt_weather.py create mode 100644 modules/service_plugins/mqtt_weather_service.py create mode 100644 tests/unit/test_mqtt_weather.py diff --git a/config.ini.example b/config.ini.example index d4229ad..c67ff4e 100644 --- a/config.ini.example +++ b/config.ini.example @@ -613,6 +613,15 @@ temperature_low_only_format = L:{low}{units} # # Note: Only one 'default' source can be set per provider type. # Future providers (e.g., custom.pwaweather.location) will follow the same pattern. +# +# MQTT snapshot weather (optional; requires [MqttWeather] enabled = true) +# custom.mqtt_weather.default = - used for bare wx / gwx (no location) +# custom.mqtt_weather. = - wx / gwx +# Topics must not contain + or #. If both MQTT and WXSIM define the same alias, MQTT is tried first. +# Example: +# custom.mqtt_weather.default = sensors/home/weather +# custom.mqtt_weather.patio = sensors/patio/weather +# Broker and format options for MQTT wx/gwx: see [MqttWeather] in the service plugin block at the end of this file. [Solar_Config] # URL timeout for external API calls (seconds) @@ -1464,6 +1473,43 @@ min_reupload_interval = 3600 # false: Standard logging (default) verbose = false +[MqttWeather] +# MQTT subscriber for custom.mqtt_weather.* topics (requires paho-mqtt). Service plugin: mqtt_weather_service. +# Topic strings belong under [Weather]: custom.mqtt_weather.default and custom.mqtt_weather. (see that section). +enabled = false + +# Broker connection (required when enabled and at least one custom.mqtt_weather.* topic is set) +# broker = mqtt.example.com +# port = 1883 +# transport = tcp +# For WebSockets: transport = websockets, set port (e.g. 443), websocket_path, often use_tls = true +# websocket_path = /mqtt +# use_tls = false +# username = +# password = +# client_id = +# qos = 0 + +# Drop incoming MQTT payloads larger than this (bytes) +# max_payload_bytes = 65536 + +# Cached reading is stale after this many seconds (no fresh message on topic) +# stale_after_seconds = 3600 + +# passthrough: UTF-8 payload sanitized and truncated for mesh (JSON or plain text) +# json_template: JSON object only; format with fixed placeholders (no arbitrary key paths) +# output_mode = passthrough +# output_mode = json_template +# Allowed placeholders in json_template: {time} {temperature_f} {temperature_c} {humidity} {device} +# json_template = {time} | {temperature_f} F / {temperature_c} C | RH {humidity}% + +# Optional filter: only accept JSON objects where the given key's string value matches +# json_device_key = device +# json_device_value = 231 + +# Max output length after sanitize (characters) +# passthrough_max_length = 500 + [Weather_Service] # Enable weather service for scheduled forecasts and alert monitoring (true/false) enabled = false diff --git a/modules/clients/mqtt_weather.py b/modules/clients/mqtt_weather.py new file mode 100644 index 0000000..0165ff3 --- /dev/null +++ b/modules/clients/mqtt_weather.py @@ -0,0 +1,341 @@ +#!/usr/bin/env python3 +""" +MQTT custom weather: config resolution, payload parsing, and safe formatting. + +Topics are configured under [Weather] as custom.mqtt_weather. (same pattern as WXSIM URLs). +Broker and format options live in [MqttWeather]. +""" + +from __future__ import annotations + +import json +import re +import threading +import time +from configparser import ConfigParser +from dataclasses import dataclass +from typing import Any, Optional + +from ..security_utils import sanitize_input + +MQTT_WEATHER_PREFIX = "custom.mqtt_weather." + +# Placeholders allowed in json_template (str.format_map); no arbitrary JSON keys. +JSON_TEMPLATE_PLACEHOLDERS = frozenset( + {"time", "temperature_f", "temperature_c", "humidity", "device"} +) + +_TOPIC_INVALID_CHARS = re.compile(r"[\x00+#]") + +# Max lengths for string fields extracted from JSON (avoid huge blobs in mesh replies). +_MAX_TIME_STR_LEN = 120 +_MAX_DEVICE_STR_LEN = 64 +_DEFAULT_PASSTHROUGH_MAX = 500 + + +def validate_mqtt_weather_topic(topic: str) -> bool: + """Return True if topic is non-empty and has no wildcards or NUL.""" + if not topic or not str(topic).strip(): + return False + t = str(topic).strip() + if "#" in t or "+" in t: + return False + if _TOPIC_INVALID_CHARS.search(t): + return False + return True + + +def get_mqtt_weather_topic(config: ConfigParser, location: Optional[str]) -> Optional[str]: + """Resolve MQTT topic for a logical source name (mirrors WXSIM URL lookup). + + Args: + config: Bot config. + location: None for default source, else the name from ``wx ``. + + Returns: + Topic string or None. + """ + section = "Weather" + if not config.has_section(section): + return None + + if location: + location = location.strip() + location_lower = location.lower() + prefix = MQTT_WEATHER_PREFIX + for key, value in config.items(section): + if not key.startswith(prefix): + continue + key_location = key[len(prefix) :].strip() + if key_location.lower() == location_lower: + topic = (value or "").strip() + if topic and validate_mqtt_weather_topic(topic): + return topic + return None + + default_key = f"{MQTT_WEATHER_PREFIX}default" + if config.has_option(section, default_key): + topic = config.get(section, default_key, fallback="").strip() + if topic and validate_mqtt_weather_topic(topic): + return topic + return None + + +def iter_mqtt_weather_topics(config: ConfigParser) -> list[str]: + """Return unique valid topics from all custom.mqtt_weather.* keys.""" + section = "Weather" + if not config.has_section(section): + return [] + seen: set[str] = set() + out: list[str] = [] + prefix = MQTT_WEATHER_PREFIX + for key, value in config.items(section): + if not key.startswith(prefix): + continue + topic = (value or "").strip() + if not topic or not validate_mqtt_weather_topic(topic): + continue + if topic not in seen: + seen.add(topic) + out.append(topic) + return out + + +@dataclass +class MqttWeatherFormatConfig: + """Formatting options loaded from [MqttWeather].""" + + output_mode: str # passthrough | json_template + json_template: str + json_device_key: str + json_device_value: str + max_payload_bytes: int + passthrough_max_length: int + stale_after_seconds: float + + +def load_mqtt_weather_format_config(config: ConfigParser) -> MqttWeatherFormatConfig: + """Read [MqttWeather] formatting options (defaults are safe).""" + sec = "MqttWeather" + mode = "passthrough" + if config.has_section(sec): + mode = config.get(sec, "output_mode", fallback="passthrough").strip().lower() + if mode not in ("passthrough", "json_template"): + mode = "passthrough" + + default_template = ( + "{time} | {temperature_f}°F ({temperature_c}°C) | RH {humidity}%" + ) + template = default_template + dev_key = "" + dev_val = "" + max_bytes = 65536 + pass_max = _DEFAULT_PASSTHROUGH_MAX + stale = 3600.0 + + if config.has_section(sec): + template = config.get(sec, "json_template", fallback=default_template) + dev_key = config.get(sec, "json_device_key", fallback="").strip() + dev_val = config.get(sec, "json_device_value", fallback="").strip() + max_bytes = config.getint(sec, "max_payload_bytes", fallback=65536) + pass_max = config.getint(sec, "passthrough_max_length", fallback=_DEFAULT_PASSTHROUGH_MAX) + stale = config.getfloat(sec, "stale_after_seconds", fallback=3600.0) + + if max_bytes < 256: + max_bytes = 256 + if max_bytes > 1_048_576: + max_bytes = 1_048_576 + if pass_max < 64: + pass_max = 64 + if pass_max > 4000: + pass_max = 4000 + if stale < 5.0: + stale = 5.0 + if stale > 86400.0 * 7: + stale = 86400.0 * 7 + + return MqttWeatherFormatConfig( + output_mode=mode, + json_template=template, + json_device_key=dev_key, + json_device_value=dev_val, + max_payload_bytes=max_bytes, + passthrough_max_length=pass_max, + stale_after_seconds=stale, + ) + + +def _normalize_json_scalar(value: Any) -> str: + if value is None: + return "" + if isinstance(value, bool): + return "true" if value else "false" + if isinstance(value, (int, float)): + if isinstance(value, float) and (value != value): # NaN + return "" + return str(value) + return str(value).strip() + + +def _safe_time_str(value: Any) -> str: + s = _normalize_json_scalar(value) + if len(s) > _MAX_TIME_STR_LEN: + s = s[:_MAX_TIME_STR_LEN] + return sanitize_input(s, max_length=_MAX_TIME_STR_LEN, strip_controls=True) + + +def _safe_device_str(value: Any) -> str: + s = _normalize_json_scalar(value) + if len(s) > _MAX_DEVICE_STR_LEN: + s = s[:_MAX_DEVICE_STR_LEN] + return sanitize_input(s, max_length=_MAX_DEVICE_STR_LEN, strip_controls=True) + + +def _coerce_float(value: Any) -> Optional[float]: + if value is None: + return None + if isinstance(value, bool): + return None + if isinstance(value, (int, float)): + f = float(value) + if f != f or abs(f) > 1e6: # NaN or absurd + return None + return f + if isinstance(value, str): + try: + f = float(value.strip()) + if f != f or abs(f) > 1e6: + return None + return f + except ValueError: + return None + return None + + +def _validate_json_template(template: str) -> Optional[str]: + """Return error message if template uses unknown placeholders.""" + for m in re.finditer(r"\{([^}]+)\}", template): + name = m.group(1).strip() + if name and name not in JSON_TEMPLATE_PLACEHOLDERS: + return f"Invalid template placeholder: {{{name}}}" + return None + + +def format_mqtt_weather_payload( + payload: bytes, + fmt: MqttWeatherFormatConfig, +) -> tuple[Optional[str], Optional[str]]: + """Turn raw MQTT payload bytes into display text. + + Returns: + (text, error) — exactly one is non-None. + """ + if not payload: + return None, "empty_payload" + + if len(payload) > fmt.max_payload_bytes: + return None, "payload_too_large" + + try: + text = payload.decode("utf-8", errors="replace") + except Exception: + return None, "decode_error" + + if fmt.output_mode == "passthrough": + out = sanitize_input( + text, max_length=fmt.passthrough_max_length, strip_controls=True + ) + if not out: + return None, "empty_after_sanitize" + return out, None + + # json_template + tmpl_err = _validate_json_template(fmt.json_template) + if tmpl_err: + return None, tmpl_err + + try: + data = json.loads(text) + except json.JSONDecodeError: + return None, "invalid_json" + + if not isinstance(data, dict): + return None, "json_not_object" + + if fmt.json_device_key: + if _normalize_json_scalar(data.get(fmt.json_device_key)) != fmt.json_device_value: + return None, "device_filter_mismatch" + + temp_f = _coerce_float(data.get("temperature_F")) + if temp_f is None: + temp_f = _coerce_float(data.get("temperature_f")) + hum = _coerce_float(data.get("humidity")) + if hum is not None: + hum = round(hum, 2) + if hum < 0 or hum > 100: + hum = None + + temp_c: Optional[float] = None + if temp_f is not None: + temp_c = (temp_f - 32.0) * 5.0 / 9.0 + + mapping = { + "time": _safe_time_str(data.get("time")), + "device": _safe_device_str(data.get("device")), + "temperature_f": f"{temp_f:.2f}" if temp_f is not None else "", + "temperature_c": f"{temp_c:.2f}" if temp_c is not None else "", + "humidity": f"{hum:.0f}" if hum is not None else "", + } + + try: + out = fmt.json_template.format_map(mapping) + except (KeyError, ValueError, IndexError): + return None, "template_format_error" + + out = sanitize_input(out, max_length=fmt.passthrough_max_length, strip_controls=True) + if not out: + return None, "empty_after_sanitize" + return out, None + + +class MqttWeatherCache: + """Thread-safe last-value cache per subscribed topic.""" + + def __init__(self) -> None: + self._lock = threading.Lock() + self._by_topic: dict[str, tuple[bytes, float]] = {} + + def update(self, topic: str, payload: bytes) -> None: + with self._lock: + self._by_topic[topic] = (payload, time.monotonic()) + + def get(self, topic: str) -> tuple[Optional[bytes], Optional[float]]: + with self._lock: + row = self._by_topic.get(topic) + if not row: + return None, None + return row[0], row[1] + + def clear(self) -> None: + with self._lock: + self._by_topic.clear() + + +def mqtt_weather_display_for_topic( + topic: str, + cache: Optional[MqttWeatherCache], + fmt: MqttWeatherFormatConfig, +) -> tuple[Optional[str], Optional[str]]: + """Read cache for topic, check staleness, return formatted line or error key.""" + if cache is None: + return None, "no_cache" + + payload, ts = cache.get(topic) + if payload is None or ts is None: + return None, "no_data" + + age = time.monotonic() - ts + if age > fmt.stale_after_seconds: + return None, "stale" + + return format_mqtt_weather_payload(payload, fmt) diff --git a/modules/commands/alternatives/wx_international.py b/modules/commands/alternatives/wx_international.py index f6961e2..5695cb4 100644 --- a/modules/commands/alternatives/wx_international.py +++ b/modules/commands/alternatives/wx_international.py @@ -28,6 +28,12 @@ except ImportError: WXSIM_PARSER_AVAILABLE = False WXSIMParser = None +from ...clients.mqtt_weather import ( + get_mqtt_weather_topic, + load_mqtt_weather_format_config, + mqtt_weather_display_for_topic, +) + # Multiday: plain digits, 7day/7-day, or suffix form 7d/10d (min 2, max below). Open-Meteo allows up to 16 forecast days. GWX_MULTIDAY_MAX_DAYS = 16 @@ -41,7 +47,8 @@ class GlobalWxCommand(BaseCommand): description = "Get weather information for any global location (usage: gwx Tokyo)" category = "weather" cooldown_seconds = 5 # 5 second cooldown per user to prevent API abuse - requires_internet = True # Requires internet access for Open-Meteo API and geocoding + # Open-Meteo/geocoding need the network; custom MQTT/WXSIM may be LAN-only. + requires_internet = False # Documentation short_description = "Get weather for any global location using Open-Meteo API" @@ -206,6 +213,34 @@ class GlobalWxCommand(BaseCommand): self.logger.debug(f"Error getting bot location: {e}") return None + def _get_custom_mqtt_weather_topic(self, location: Optional[str] = None) -> Optional[str]: + return get_mqtt_weather_topic(self.bot.config, location) + + def _mqtt_weather_line( + self, + topic: str, + forecast_type: str, + location_name: Optional[str], + ) -> str: + if forecast_type != "default": + return self.translate("commands.gwx.mqtt_forecast_not_supported") + + fmt = load_mqtt_weather_format_config(self.bot.config) + cache = getattr(self.bot, "mqtt_weather_cache", None) + text, err = mqtt_weather_display_for_topic(topic, cache, fmt) + if text is not None: + if location_name: + return f"{location_name}: {text}" + return text + if err == "no_cache": + return self.translate("commands.gwx.mqtt_weather_no_subscriber") + if err in ("no_data", "empty_payload", "empty_after_sanitize"): + return self.translate("commands.gwx.mqtt_weather_no_data") + if err == "stale": + return self.translate("commands.gwx.mqtt_weather_stale") + detail = (err or "unknown").replace("_", " ") + return self.translate("commands.gwx.mqtt_weather_payload_error", detail=detail) + def _get_custom_wxsim_source(self, location: Optional[str] = None) -> Optional[str]: """Get custom WXSIM source URL from config. @@ -386,8 +421,20 @@ class GlobalWxCommand(BaseCommand): # Parse the command to extract location and forecast type parts = content.split() - # If no location specified, check for custom WXSIM default source first + # If no location specified, check custom MQTT then WXSIM default sources if len(parts) < 2: + mqtt_topic = self._get_custom_mqtt_weather_topic(None) + if mqtt_topic: + try: + self.record_execution(message.sender_id) + weather_data = self._mqtt_weather_line(mqtt_topic, "default", None) + await self.send_response(message, weather_data) + return True + except Exception as e: + self.logger.error(f"Error reading MQTT weather: {e}") + await self.send_response(message, self.translate("commands.gwx.error", error=str(e))) + return True + wxsim_source = self._get_custom_wxsim_source(None) # Check for default if wxsim_source: # Use custom WXSIM default source @@ -484,6 +531,23 @@ class GlobalWxCommand(BaseCommand): await self.send_response(message, self.translate('commands.gwx.usage')) return True + # Custom MQTT before WXSIM + mqtt_topic = self._get_custom_mqtt_weather_topic(location) + if mqtt_topic: + self.logger.info(f"Using custom MQTT weather topic for location '{location}': {mqtt_topic}") + try: + self.record_execution(message.sender_id) + weather_data = self._mqtt_weather_line(mqtt_topic, forecast_type, location) + if forecast_type == "multiday": + await self._send_multiday_forecast(message, weather_data) + else: + await self.send_response(message, weather_data) + return True + except Exception as e: + self.logger.error(f"Error reading MQTT weather: {e}") + await self.send_response(message, self.translate("commands.gwx.error", error=str(e))) + return True + # Check for custom WXSIM source first (before normal geocoding) wxsim_source = self._get_custom_wxsim_source(location) if wxsim_source: diff --git a/modules/commands/wx_command.py b/modules/commands/wx_command.py index 6337a6d..5213aa3 100644 --- a/modules/commands/wx_command.py +++ b/modules/commands/wx_command.py @@ -39,6 +39,12 @@ except ImportError: WXSIM_PARSER_AVAILABLE = False WXSIMParser = None +from ..clients.mqtt_weather import ( + get_mqtt_weather_topic, + load_mqtt_weather_format_config, + mqtt_weather_display_for_topic, +) + # Multiday: plain digits (e.g. 7), 7day/7-day, or suffix form 7d/10d (min 2, max below). WX_MULTIDAY_MAX_DAYS = 16 @@ -52,7 +58,8 @@ class WxCommand(BaseCommand): description = "Get weather information for a zip code (usage: wx 12345)" category = "weather" cooldown_seconds = 5 # 5 second cooldown per user to prevent API abuse - requires_internet = True # Requires internet access for NOAA API and geocoding + # NOAA/geocoding need the network, but custom WXSIM/MQTT sources may be LAN-only; check connectivity inside execute paths. + requires_internet = False # Documentation short_description = "Get weather for a US location using NOAA weather data" @@ -299,6 +306,39 @@ class WxCommand(BaseCommand): return None + def _get_custom_mqtt_weather_topic(self, location: Optional[str] = None) -> Optional[str]: + """MQTT topic for custom.mqtt_weather. (see get_mqtt_weather_topic).""" + return get_mqtt_weather_topic(self.bot.config, location) + + def _mqtt_weather_line( + self, + topic: str, + forecast_type: str, + location_name: Optional[str], + ) -> str: + """Format cached MQTT payload for wx output.""" + if forecast_type != "default": + return self.translate("commands.wx.mqtt_forecast_not_supported") + + fmt = load_mqtt_weather_format_config(self.bot.config) + cache = getattr(self.bot, "mqtt_weather_cache", None) + text, err = mqtt_weather_display_for_topic(topic, cache, fmt) + if text is not None: + if location_name: + return f"{location_name}: {text}" + return text + return self._mqtt_weather_error_key(err) + + def _mqtt_weather_error_key(self, err: Optional[str]) -> str: + if err == "no_cache": + return self.translate("commands.wx.mqtt_weather_no_subscriber") + if err in ("no_data", "empty_payload", "empty_after_sanitize"): + return self.translate("commands.wx.mqtt_weather_no_data") + if err == "stale": + return self.translate("commands.wx.mqtt_weather_stale") + detail = (err or "unknown").replace("_", " ") + return self.translate("commands.wx.mqtt_weather_payload_error", detail=detail) + def _get_wxsim_weather(self, source_url: str, forecast_type: str = "default", num_days: int = 7, message: MeshMessage = None, location_name: Optional[str] = None) -> str: @@ -449,8 +489,20 @@ class WxCommand(BaseCommand): # Track if we're using companion location (so we always show location in response) using_companion_location = False - # If no location specified, check for custom WXSIM default source first + # If no location specified, check custom MQTT then WXSIM default sources if len(parts) < 2: + mqtt_topic = self._get_custom_mqtt_weather_topic(None) + if mqtt_topic: + try: + self.record_execution(message.sender_id) + weather_data = self._mqtt_weather_line(mqtt_topic, "default", None) + await self.send_response(message, weather_data) + return True + except Exception as e: + self.logger.error(f"Error reading MQTT weather: {e}") + await self.send_response(message, self.translate("commands.wx.error", error=str(e))) + return True + wxsim_source = self._get_custom_wxsim_source(None) # Check for default if wxsim_source: # Use custom WXSIM default source @@ -554,6 +606,26 @@ class WxCommand(BaseCommand): await self.send_response(message, self.translate('commands.wx.usage')) return True + # Custom MQTT before WXSIM; skip snapshot sources when user asked for NOAA alerts + if not show_full_alerts: + mqtt_topic = self._get_custom_mqtt_weather_topic(location) + if mqtt_topic: + self.logger.info(f"Using custom MQTT weather topic for location '{location}': {mqtt_topic}") + try: + self.record_execution(message.sender_id) + weather_data = self._mqtt_weather_line( + mqtt_topic, forecast_type, location + ) + if forecast_type == "multiday": + await self._send_multiday_forecast(message, weather_data) + else: + await self.send_response(message, weather_data) + return True + except Exception as e: + self.logger.error(f"Error reading MQTT weather: {e}") + await self.send_response(message, self.translate("commands.wx.error", error=str(e))) + return True + # Check for custom WXSIM source first (before checking location type) wxsim_source = self._get_custom_wxsim_source(location) if wxsim_source: diff --git a/modules/service_plugins/mqtt_weather_service.py b/modules/service_plugins/mqtt_weather_service.py new file mode 100644 index 0000000..a065e2d --- /dev/null +++ b/modules/service_plugins/mqtt_weather_service.py @@ -0,0 +1,191 @@ +#!/usr/bin/env python3 +""" +MQTT subscriber for custom weather topics (custom.mqtt_weather.* in [Weather]). +Caches the latest payload per topic for wx / gwx commands. +""" + +from __future__ import annotations + +import asyncio +import logging +import os +from typing import Any, Optional + +try: + import paho.mqtt.client as mqtt +except ImportError: + mqtt = None + +from ..clients.mqtt_weather import MqttWeatherCache, iter_mqtt_weather_topics +from .base_service import BaseServicePlugin + + +class MqttWeatherService(BaseServicePlugin): + """Subscribe to configured MQTT weather topics and update bot.mqtt_weather_cache.""" + + config_section = "MqttWeather" + description = "MQTT subscriber for custom.mqtt_weather.* wx/gwx sources" + + def __init__(self, bot: Any): + super().__init__(bot) + self.logger = logging.getLogger("MqttWeatherService") + self.logger.setLevel(bot.logger.level) + self._client: Any = None + self._cache: Optional[MqttWeatherCache] = None + self._topics: list[str] = [] + + def _parse_broker_config(self) -> Optional[dict[str, Any]]: + cfg = self.bot.config + sec = "MqttWeather" + if not cfg.has_section(sec): + return None + host = cfg.get(sec, "broker", fallback="").strip() + if not host: + self.logger.error("MqttWeather: broker hostname is required") + return None + port = cfg.getint(sec, "port", fallback=1883) + transport = cfg.get(sec, "transport", fallback="tcp").strip().lower() + ws_path = cfg.get(sec, "websocket_path", fallback="/mqtt").strip() or "/mqtt" + use_tls = cfg.getboolean(sec, "use_tls", fallback=False) + username = cfg.get(sec, "username", fallback="").strip() or None + password = cfg.get(sec, "password", fallback="").strip() or None + client_id = cfg.get(sec, "client_id", fallback="").strip() or None + qos = cfg.getint(sec, "qos", fallback=0) + if qos not in (0, 1, 2): + qos = 0 + return { + "host": host, + "port": port, + "transport": transport, + "websocket_path": ws_path, + "use_tls": use_tls, + "username": username, + "password": password, + "client_id": client_id, + "qos": qos, + } + + async def start(self) -> None: + if not self.enabled: + return + + self._topics = iter_mqtt_weather_topics(self.bot.config) + if not self._topics: + self.logger.warning( + "MqttWeather enabled but no valid custom.mqtt_weather.* topics in [Weather]; " + "subscriber not started" + ) + self._running = True + return + + if mqtt is None: + self.logger.error( + "MqttWeather: paho-mqtt not installed; pip install paho-mqtt" + ) + self._running = True + return + + broker = self._parse_broker_config() + if not broker: + self._running = True + return + + self._cache = MqttWeatherCache() + self.bot.mqtt_weather_cache = self._cache + + bot_name = self.bot.config.get("Bot", "bot_name", fallback="MeshCoreBot") + client_id = broker["client_id"] + if not client_id: + safe_name = "".join(c if c.isalnum() or c == "-" else "-" for c in bot_name) + client_id = f"{safe_name}-mqtt-wx-{os.getpid()}" + + topics = self._topics + qos = broker["qos"] + cache = self._cache + logger = self.logger + + def on_message(_client: Any, _userdata: Any, msg: Any) -> None: + try: + payload = msg.payload + if payload is None: + return + if isinstance(payload, (bytes, bytearray)): + b = bytes(payload) + else: + b = str(payload).encode("utf-8", errors="replace") + topic_str = getattr(msg, "topic", None) + if isinstance(topic_str, bytes): + topic_str = topic_str.decode("utf-8", errors="replace") + if not topic_str: + return + cache.update(topic_str, b) + except Exception as e: + logger.debug(f"MqttWeather on_message error: {e}") + + def on_connect(client: Any, userdata: Any, flags: Any, rc: int, properties: Any = None) -> None: + if rc != 0: + logger.warning(f"MqttWeather connect failed rc={rc}") + return + for t in userdata: + try: + client.subscribe(t, qos=qos) + logger.info(f"MqttWeather subscribed to {t!r} (qos={qos})") + except Exception as e: + logger.error(f"MqttWeather subscribe failed for {t!r}: {e}") + + transport = broker["transport"] + try: + if transport == "websockets": + self._client = mqtt.Client( + client_id=client_id, + userdata=topics, + transport="websockets", + ) + self._client.ws_set_options(path=broker["websocket_path"], headers=None) + else: + self._client = mqtt.Client(client_id=client_id, userdata=topics) + + self._client.reconnect_delay_set(min_delay=1, max_delay=120) + self._client.on_connect = on_connect + self._client.on_message = on_message + + if broker["use_tls"]: + import ssl + + self._client.tls_set(cert_reqs=ssl.CERT_NONE) + + if broker["username"]: + self._client.username_pw_set(broker["username"], broker["password"]) + + loop = asyncio.get_event_loop() + + def do_connect() -> None: + self._client.connect(broker["host"], broker["port"], keepalive=60) + + await loop.run_in_executor(None, do_connect) + self._client.loop_start() + self.logger.info( + f"MqttWeather MQTT started ({broker['host']}:{broker['port']}, {transport})" + ) + except Exception as e: + self.logger.error(f"MqttWeather failed to start MQTT client: {e}") + self._client = None + + self._running = True + + async def stop(self) -> None: + if self._client is not None: + try: + self._client.loop_stop() + self._client.disconnect() + except Exception as e: + self.logger.debug(f"MqttWeather disconnect: {e}") + self._client = None + + if self._cache is not None: + self._cache.clear() + self._cache = None + if getattr(self.bot, "mqtt_weather_cache", None) is not None: + delattr(self.bot, "mqtt_weather_cache") + + self._running = False diff --git a/tests/unit/test_mqtt_weather.py b/tests/unit/test_mqtt_weather.py new file mode 100644 index 0000000..a1355df --- /dev/null +++ b/tests/unit/test_mqtt_weather.py @@ -0,0 +1,175 @@ +#!/usr/bin/env python3 +"""Unit tests for modules.clients.mqtt_weather.""" + +import json +import time +from configparser import ConfigParser + +from modules.clients.mqtt_weather import ( + MqttWeatherCache, + MqttWeatherFormatConfig, + format_mqtt_weather_payload, + get_mqtt_weather_topic, + iter_mqtt_weather_topics, + mqtt_weather_display_for_topic, + validate_mqtt_weather_topic, +) + + +def test_validate_mqtt_topic() -> None: + assert validate_mqtt_weather_topic("home/weather") is True + assert validate_mqtt_weather_topic("") is False + assert validate_mqtt_weather_topic("a/+") is False + assert validate_mqtt_weather_topic("a/#") is False + + +def test_get_mqtt_weather_topic_named_and_default() -> None: + cfg = ConfigParser() + cfg.read_dict( + { + "Weather": { + "custom.mqtt_weather.default": "t/default", + "custom.mqtt_weather.patio": "t/patio", + } + } + ) + assert get_mqtt_weather_topic(cfg, None) == "t/default" + assert get_mqtt_weather_topic(cfg, "patio") == "t/patio" + assert get_mqtt_weather_topic(cfg, "missing") is None + + +def test_iter_mqtt_weather_topics_unique() -> None: + cfg = ConfigParser() + cfg.read_dict( + { + "Weather": { + "custom.mqtt_weather.default": "same/topic", + "custom.mqtt_weather.a": "same/topic", + "custom.mqtt_weather.bad": "+/invalid", + } + } + ) + topics = iter_mqtt_weather_topics(cfg) + assert topics == ["same/topic"] + + +def test_format_passthrough() -> None: + fmt = MqttWeatherFormatConfig( + output_mode="passthrough", + json_template="", + json_device_key="", + json_device_value="", + max_payload_bytes=1024, + passthrough_max_length=200, + stale_after_seconds=60.0, + ) + raw = b"Date/Time: now\nTemp: 20 C" + text, err = format_mqtt_weather_payload(raw, fmt) + assert err is None + assert text == "Date/Time: now\nTemp: 20 C" + + +def test_format_json_template_and_device_filter() -> None: + fmt = MqttWeatherFormatConfig( + output_mode="json_template", + json_template="{time} | {temperature_f}F / {temperature_c}C | {humidity}%", + json_device_key="device", + json_device_value="231", + max_payload_bytes=1024, + passthrough_max_length=500, + stale_after_seconds=60.0, + ) + payload = { + "device": "231", + "temperature_F": 50.0, + "humidity": 55, + "time": "2026-04-01 12:00", + } + raw = json.dumps(payload).encode() + text, err = format_mqtt_weather_payload(raw, fmt) + assert err is None + assert "50.00F" in text or "50.00" in text + assert "10.00C" in text or "10.00" in text + assert "55%" in text + + payload_bad = dict(payload) + payload_bad["device"] = "999" + raw2 = json.dumps(payload_bad).encode() + text2, err2 = format_mqtt_weather_payload(raw2, fmt) + assert text2 is None + assert err2 == "device_filter_mismatch" + + +def test_payload_too_large() -> None: + fmt = MqttWeatherFormatConfig( + output_mode="passthrough", + json_template="", + json_device_key="", + json_device_value="", + max_payload_bytes=10, + passthrough_max_length=500, + stale_after_seconds=60.0, + ) + text, err = format_mqtt_weather_payload(b"x" * 20, fmt) + assert text is None + assert err == "payload_too_large" + + +def test_invalid_template_placeholder() -> None: + fmt = MqttWeatherFormatConfig( + output_mode="json_template", + json_template="{time} {bogus}", + json_device_key="", + json_device_value="", + max_payload_bytes=1024, + passthrough_max_length=500, + stale_after_seconds=60.0, + ) + raw = json.dumps( + {"time": "t", "temperature_F": 32, "humidity": 40, "device": "1"} + ).encode() + text, err = format_mqtt_weather_payload(raw, fmt) + assert text is None + assert err is not None + assert "bogus" in err + + +def test_mqtt_weather_display_stale() -> None: + cache = MqttWeatherCache() + cache.update("t1", b"hello") + # force old timestamp + with cache._lock: + payload, _ts = cache._by_topic["t1"] + cache._by_topic["t1"] = (payload, time.monotonic() - 100.0) + + fmt = MqttWeatherFormatConfig( + output_mode="passthrough", + json_template="", + json_device_key="", + json_device_value="", + max_payload_bytes=1024, + passthrough_max_length=500, + stale_after_seconds=30.0, + ) + text, err = mqtt_weather_display_for_topic("t1", cache, fmt) + assert text is None + assert err == "stale" + + +def test_mqtt_weather_display_no_cache() -> None: + fmt = MqttWeatherFormatConfig( + output_mode="passthrough", + json_template="", + json_device_key="", + json_device_value="", + max_payload_bytes=1024, + passthrough_max_length=500, + stale_after_seconds=60.0, + ) + text, err = mqtt_weather_display_for_topic("missing", MqttWeatherCache(), fmt) + assert text is None + assert err == "no_data" + + text2, err2 = mqtt_weather_display_for_topic("missing", None, fmt) + assert text2 is None + assert err2 == "no_cache" diff --git a/translations/en.json b/translations/en.json index 3ec907d..bb1408b 100644 --- a/translations/en.json +++ b/translations/en.json @@ -79,7 +79,12 @@ "tomorrow_not_available": "Tomorrow's forecast not available", "tomorrow_error": "Error formatting tomorrow's forecast", "multiday_not_available": "{num_days}-day forecast not available", - "multiday_error": "Error formatting {num_days}-day forecast" + "multiday_error": "Error formatting {num_days}-day forecast", + "mqtt_forecast_not_supported": "Extended forecast is not available for MQTT weather sources", + "mqtt_weather_no_subscriber": "MQTT weather subscriber is not active (enable [MqttWeather] and custom.mqtt_weather.* topics)", + "mqtt_weather_no_data": "No MQTT weather message received for this topic yet", + "mqtt_weather_stale": "MQTT weather data is too old", + "mqtt_weather_payload_error": "MQTT weather payload error: {detail}" }, "gwx": { "description": "Get weather information for any global location (usage: gwx Tokyo)", @@ -93,6 +98,11 @@ "tomorrow_error": "Error formatting tomorrow's forecast", "multiday_not_available": "{num_days}-day forecast not available", "multiday_error": "Error formatting {num_days}-day forecast", + "mqtt_forecast_not_supported": "Extended forecast is not available for MQTT weather sources", + "mqtt_weather_no_subscriber": "MQTT weather subscriber is not active (enable [MqttWeather] and custom.mqtt_weather.* topics)", + "mqtt_weather_no_data": "No MQTT weather message received for this topic yet", + "mqtt_weather_stale": "MQTT weather data is too old", + "mqtt_weather_payload_error": "MQTT weather payload error: {detail}", "periods": { "today": "Today", "tonight": "Tonight",