Implement MQTT weather support in weather commands and configuration

- Added new configuration options in `config.ini.example` for enabling and configuring MQTT weather sources.
- Updated `WxCommand` and `GlobalWxCommand` classes to retrieve weather data from custom MQTT topics, enhancing flexibility in weather data sources.
- Introduced error handling for MQTT data retrieval, ensuring robust responses for various error scenarios.
- Expanded translation strings in `en.json` to support MQTT-related messages, improving user feedback.

These changes enhance the application's capability to utilize MQTT for weather data, providing users with more options for weather information retrieval.
This commit is contained in:
agessaman
2026-04-02 10:24:49 -07:00
parent e3d4e2db6e
commit 9d768a3cf7
7 changed files with 904 additions and 5 deletions
+46
View File
@@ -613,6 +613,15 @@ temperature_low_only_format = L:{low}{units}
#
# Note: Only one 'default' source can be set per provider type.
# Future providers (e.g., custom.pwaweather.location) will follow the same pattern.
#
# MQTT snapshot weather (optional; requires [MqttWeather] enabled = true)
# custom.mqtt_weather.default = <topic> - used for bare wx / gwx (no location)
# custom.mqtt_weather.<name> = <topic> - wx <name> / gwx <name>
# Topics must not contain + or #. If both MQTT and WXSIM define the same alias, MQTT is tried first.
# Example:
# custom.mqtt_weather.default = sensors/home/weather
# custom.mqtt_weather.patio = sensors/patio/weather
# Broker and format options for MQTT wx/gwx: see [MqttWeather] in the service plugin block at the end of this file.
[Solar_Config]
# URL timeout for external API calls (seconds)
@@ -1464,6 +1473,43 @@ min_reupload_interval = 3600
# false: Standard logging (default)
verbose = false
[MqttWeather]
# MQTT subscriber for custom.mqtt_weather.* topics (requires paho-mqtt). Service plugin: mqtt_weather_service.
# Topic strings belong under [Weather]: custom.mqtt_weather.default and custom.mqtt_weather.<name> (see that section).
enabled = false
# Broker connection (required when enabled and at least one custom.mqtt_weather.* topic is set)
# broker = mqtt.example.com
# port = 1883
# transport = tcp
# For WebSockets: transport = websockets, set port (e.g. 443), websocket_path, often use_tls = true
# websocket_path = /mqtt
# use_tls = false
# username =
# password =
# client_id =
# qos = 0
# Drop incoming MQTT payloads larger than this (bytes)
# max_payload_bytes = 65536
# Cached reading is stale after this many seconds (no fresh message on topic)
# stale_after_seconds = 3600
# passthrough: UTF-8 payload sanitized and truncated for mesh (JSON or plain text)
# json_template: JSON object only; format with fixed placeholders (no arbitrary key paths)
# output_mode = passthrough
# output_mode = json_template
# Allowed placeholders in json_template: {time} {temperature_f} {temperature_c} {humidity} {device}
# json_template = {time} | {temperature_f} F / {temperature_c} C | RH {humidity}%
# Optional filter: only accept JSON objects where the given key's string value matches
# json_device_key = device
# json_device_value = 231
# Max output length after sanitize (characters)
# passthrough_max_length = 500
[Weather_Service]
# Enable weather service for scheduled forecasts and alert monitoring (true/false)
enabled = false
+341
View File
@@ -0,0 +1,341 @@
#!/usr/bin/env python3
"""
MQTT custom weather: config resolution, payload parsing, and safe formatting.
Topics are configured under [Weather] as custom.mqtt_weather.<name> (same pattern as WXSIM URLs).
Broker and format options live in [MqttWeather].
"""
from __future__ import annotations
import json
import re
import threading
import time
from configparser import ConfigParser
from dataclasses import dataclass
from typing import Any, Optional
from ..security_utils import sanitize_input
MQTT_WEATHER_PREFIX = "custom.mqtt_weather."
# Placeholders allowed in json_template (str.format_map); no arbitrary JSON keys.
JSON_TEMPLATE_PLACEHOLDERS = frozenset(
{"time", "temperature_f", "temperature_c", "humidity", "device"}
)
_TOPIC_INVALID_CHARS = re.compile(r"[\x00+#]")
# Max lengths for string fields extracted from JSON (avoid huge blobs in mesh replies).
_MAX_TIME_STR_LEN = 120
_MAX_DEVICE_STR_LEN = 64
_DEFAULT_PASSTHROUGH_MAX = 500
def validate_mqtt_weather_topic(topic: str) -> bool:
"""Return True if topic is non-empty and has no wildcards or NUL."""
if not topic or not str(topic).strip():
return False
t = str(topic).strip()
if "#" in t or "+" in t:
return False
if _TOPIC_INVALID_CHARS.search(t):
return False
return True
def get_mqtt_weather_topic(config: ConfigParser, location: Optional[str]) -> Optional[str]:
"""Resolve MQTT topic for a logical source name (mirrors WXSIM URL lookup).
Args:
config: Bot config.
location: None for default source, else the name from ``wx <name>``.
Returns:
Topic string or None.
"""
section = "Weather"
if not config.has_section(section):
return None
if location:
location = location.strip()
location_lower = location.lower()
prefix = MQTT_WEATHER_PREFIX
for key, value in config.items(section):
if not key.startswith(prefix):
continue
key_location = key[len(prefix) :].strip()
if key_location.lower() == location_lower:
topic = (value or "").strip()
if topic and validate_mqtt_weather_topic(topic):
return topic
return None
default_key = f"{MQTT_WEATHER_PREFIX}default"
if config.has_option(section, default_key):
topic = config.get(section, default_key, fallback="").strip()
if topic and validate_mqtt_weather_topic(topic):
return topic
return None
def iter_mqtt_weather_topics(config: ConfigParser) -> list[str]:
"""Return unique valid topics from all custom.mqtt_weather.* keys."""
section = "Weather"
if not config.has_section(section):
return []
seen: set[str] = set()
out: list[str] = []
prefix = MQTT_WEATHER_PREFIX
for key, value in config.items(section):
if not key.startswith(prefix):
continue
topic = (value or "").strip()
if not topic or not validate_mqtt_weather_topic(topic):
continue
if topic not in seen:
seen.add(topic)
out.append(topic)
return out
@dataclass
class MqttWeatherFormatConfig:
"""Formatting options loaded from [MqttWeather]."""
output_mode: str # passthrough | json_template
json_template: str
json_device_key: str
json_device_value: str
max_payload_bytes: int
passthrough_max_length: int
stale_after_seconds: float
def load_mqtt_weather_format_config(config: ConfigParser) -> MqttWeatherFormatConfig:
"""Read [MqttWeather] formatting options (defaults are safe)."""
sec = "MqttWeather"
mode = "passthrough"
if config.has_section(sec):
mode = config.get(sec, "output_mode", fallback="passthrough").strip().lower()
if mode not in ("passthrough", "json_template"):
mode = "passthrough"
default_template = (
"{time} | {temperature_f}°F ({temperature_c}°C) | RH {humidity}%"
)
template = default_template
dev_key = ""
dev_val = ""
max_bytes = 65536
pass_max = _DEFAULT_PASSTHROUGH_MAX
stale = 3600.0
if config.has_section(sec):
template = config.get(sec, "json_template", fallback=default_template)
dev_key = config.get(sec, "json_device_key", fallback="").strip()
dev_val = config.get(sec, "json_device_value", fallback="").strip()
max_bytes = config.getint(sec, "max_payload_bytes", fallback=65536)
pass_max = config.getint(sec, "passthrough_max_length", fallback=_DEFAULT_PASSTHROUGH_MAX)
stale = config.getfloat(sec, "stale_after_seconds", fallback=3600.0)
if max_bytes < 256:
max_bytes = 256
if max_bytes > 1_048_576:
max_bytes = 1_048_576
if pass_max < 64:
pass_max = 64
if pass_max > 4000:
pass_max = 4000
if stale < 5.0:
stale = 5.0
if stale > 86400.0 * 7:
stale = 86400.0 * 7
return MqttWeatherFormatConfig(
output_mode=mode,
json_template=template,
json_device_key=dev_key,
json_device_value=dev_val,
max_payload_bytes=max_bytes,
passthrough_max_length=pass_max,
stale_after_seconds=stale,
)
def _normalize_json_scalar(value: Any) -> str:
if value is None:
return ""
if isinstance(value, bool):
return "true" if value else "false"
if isinstance(value, (int, float)):
if isinstance(value, float) and (value != value): # NaN
return ""
return str(value)
return str(value).strip()
def _safe_time_str(value: Any) -> str:
s = _normalize_json_scalar(value)
if len(s) > _MAX_TIME_STR_LEN:
s = s[:_MAX_TIME_STR_LEN]
return sanitize_input(s, max_length=_MAX_TIME_STR_LEN, strip_controls=True)
def _safe_device_str(value: Any) -> str:
s = _normalize_json_scalar(value)
if len(s) > _MAX_DEVICE_STR_LEN:
s = s[:_MAX_DEVICE_STR_LEN]
return sanitize_input(s, max_length=_MAX_DEVICE_STR_LEN, strip_controls=True)
def _coerce_float(value: Any) -> Optional[float]:
if value is None:
return None
if isinstance(value, bool):
return None
if isinstance(value, (int, float)):
f = float(value)
if f != f or abs(f) > 1e6: # NaN or absurd
return None
return f
if isinstance(value, str):
try:
f = float(value.strip())
if f != f or abs(f) > 1e6:
return None
return f
except ValueError:
return None
return None
def _validate_json_template(template: str) -> Optional[str]:
"""Return error message if template uses unknown placeholders."""
for m in re.finditer(r"\{([^}]+)\}", template):
name = m.group(1).strip()
if name and name not in JSON_TEMPLATE_PLACEHOLDERS:
return f"Invalid template placeholder: {{{name}}}"
return None
def format_mqtt_weather_payload(
payload: bytes,
fmt: MqttWeatherFormatConfig,
) -> tuple[Optional[str], Optional[str]]:
"""Turn raw MQTT payload bytes into display text.
Returns:
(text, error) — exactly one is non-None.
"""
if not payload:
return None, "empty_payload"
if len(payload) > fmt.max_payload_bytes:
return None, "payload_too_large"
try:
text = payload.decode("utf-8", errors="replace")
except Exception:
return None, "decode_error"
if fmt.output_mode == "passthrough":
out = sanitize_input(
text, max_length=fmt.passthrough_max_length, strip_controls=True
)
if not out:
return None, "empty_after_sanitize"
return out, None
# json_template
tmpl_err = _validate_json_template(fmt.json_template)
if tmpl_err:
return None, tmpl_err
try:
data = json.loads(text)
except json.JSONDecodeError:
return None, "invalid_json"
if not isinstance(data, dict):
return None, "json_not_object"
if fmt.json_device_key:
if _normalize_json_scalar(data.get(fmt.json_device_key)) != fmt.json_device_value:
return None, "device_filter_mismatch"
temp_f = _coerce_float(data.get("temperature_F"))
if temp_f is None:
temp_f = _coerce_float(data.get("temperature_f"))
hum = _coerce_float(data.get("humidity"))
if hum is not None:
hum = round(hum, 2)
if hum < 0 or hum > 100:
hum = None
temp_c: Optional[float] = None
if temp_f is not None:
temp_c = (temp_f - 32.0) * 5.0 / 9.0
mapping = {
"time": _safe_time_str(data.get("time")),
"device": _safe_device_str(data.get("device")),
"temperature_f": f"{temp_f:.2f}" if temp_f is not None else "",
"temperature_c": f"{temp_c:.2f}" if temp_c is not None else "",
"humidity": f"{hum:.0f}" if hum is not None else "",
}
try:
out = fmt.json_template.format_map(mapping)
except (KeyError, ValueError, IndexError):
return None, "template_format_error"
out = sanitize_input(out, max_length=fmt.passthrough_max_length, strip_controls=True)
if not out:
return None, "empty_after_sanitize"
return out, None
class MqttWeatherCache:
"""Thread-safe last-value cache per subscribed topic."""
def __init__(self) -> None:
self._lock = threading.Lock()
self._by_topic: dict[str, tuple[bytes, float]] = {}
def update(self, topic: str, payload: bytes) -> None:
with self._lock:
self._by_topic[topic] = (payload, time.monotonic())
def get(self, topic: str) -> tuple[Optional[bytes], Optional[float]]:
with self._lock:
row = self._by_topic.get(topic)
if not row:
return None, None
return row[0], row[1]
def clear(self) -> None:
with self._lock:
self._by_topic.clear()
def mqtt_weather_display_for_topic(
topic: str,
cache: Optional[MqttWeatherCache],
fmt: MqttWeatherFormatConfig,
) -> tuple[Optional[str], Optional[str]]:
"""Read cache for topic, check staleness, return formatted line or error key."""
if cache is None:
return None, "no_cache"
payload, ts = cache.get(topic)
if payload is None or ts is None:
return None, "no_data"
age = time.monotonic() - ts
if age > fmt.stale_after_seconds:
return None, "stale"
return format_mqtt_weather_payload(payload, fmt)
@@ -28,6 +28,12 @@ except ImportError:
WXSIM_PARSER_AVAILABLE = False
WXSIMParser = None
from ...clients.mqtt_weather import (
get_mqtt_weather_topic,
load_mqtt_weather_format_config,
mqtt_weather_display_for_topic,
)
# Multiday: plain digits, 7day/7-day, or suffix form 7d/10d (min 2, max below). Open-Meteo allows up to 16 forecast days.
GWX_MULTIDAY_MAX_DAYS = 16
@@ -41,7 +47,8 @@ class GlobalWxCommand(BaseCommand):
description = "Get weather information for any global location (usage: gwx Tokyo)"
category = "weather"
cooldown_seconds = 5 # 5 second cooldown per user to prevent API abuse
requires_internet = True # Requires internet access for Open-Meteo API and geocoding
# Open-Meteo/geocoding need the network; custom MQTT/WXSIM may be LAN-only.
requires_internet = False
# Documentation
short_description = "Get weather for any global location using Open-Meteo API"
@@ -206,6 +213,34 @@ class GlobalWxCommand(BaseCommand):
self.logger.debug(f"Error getting bot location: {e}")
return None
def _get_custom_mqtt_weather_topic(self, location: Optional[str] = None) -> Optional[str]:
return get_mqtt_weather_topic(self.bot.config, location)
def _mqtt_weather_line(
self,
topic: str,
forecast_type: str,
location_name: Optional[str],
) -> str:
if forecast_type != "default":
return self.translate("commands.gwx.mqtt_forecast_not_supported")
fmt = load_mqtt_weather_format_config(self.bot.config)
cache = getattr(self.bot, "mqtt_weather_cache", None)
text, err = mqtt_weather_display_for_topic(topic, cache, fmt)
if text is not None:
if location_name:
return f"{location_name}: {text}"
return text
if err == "no_cache":
return self.translate("commands.gwx.mqtt_weather_no_subscriber")
if err in ("no_data", "empty_payload", "empty_after_sanitize"):
return self.translate("commands.gwx.mqtt_weather_no_data")
if err == "stale":
return self.translate("commands.gwx.mqtt_weather_stale")
detail = (err or "unknown").replace("_", " ")
return self.translate("commands.gwx.mqtt_weather_payload_error", detail=detail)
def _get_custom_wxsim_source(self, location: Optional[str] = None) -> Optional[str]:
"""Get custom WXSIM source URL from config.
@@ -386,8 +421,20 @@ class GlobalWxCommand(BaseCommand):
# Parse the command to extract location and forecast type
parts = content.split()
# If no location specified, check for custom WXSIM default source first
# If no location specified, check custom MQTT then WXSIM default sources
if len(parts) < 2:
mqtt_topic = self._get_custom_mqtt_weather_topic(None)
if mqtt_topic:
try:
self.record_execution(message.sender_id)
weather_data = self._mqtt_weather_line(mqtt_topic, "default", None)
await self.send_response(message, weather_data)
return True
except Exception as e:
self.logger.error(f"Error reading MQTT weather: {e}")
await self.send_response(message, self.translate("commands.gwx.error", error=str(e)))
return True
wxsim_source = self._get_custom_wxsim_source(None) # Check for default
if wxsim_source:
# Use custom WXSIM default source
@@ -484,6 +531,23 @@ class GlobalWxCommand(BaseCommand):
await self.send_response(message, self.translate('commands.gwx.usage'))
return True
# Custom MQTT before WXSIM
mqtt_topic = self._get_custom_mqtt_weather_topic(location)
if mqtt_topic:
self.logger.info(f"Using custom MQTT weather topic for location '{location}': {mqtt_topic}")
try:
self.record_execution(message.sender_id)
weather_data = self._mqtt_weather_line(mqtt_topic, forecast_type, location)
if forecast_type == "multiday":
await self._send_multiday_forecast(message, weather_data)
else:
await self.send_response(message, weather_data)
return True
except Exception as e:
self.logger.error(f"Error reading MQTT weather: {e}")
await self.send_response(message, self.translate("commands.gwx.error", error=str(e)))
return True
# Check for custom WXSIM source first (before normal geocoding)
wxsim_source = self._get_custom_wxsim_source(location)
if wxsim_source:
+74 -2
View File
@@ -39,6 +39,12 @@ except ImportError:
WXSIM_PARSER_AVAILABLE = False
WXSIMParser = None
from ..clients.mqtt_weather import (
get_mqtt_weather_topic,
load_mqtt_weather_format_config,
mqtt_weather_display_for_topic,
)
# Multiday: plain digits (e.g. 7), 7day/7-day, or suffix form 7d/10d (min 2, max below).
WX_MULTIDAY_MAX_DAYS = 16
@@ -52,7 +58,8 @@ class WxCommand(BaseCommand):
description = "Get weather information for a zip code (usage: wx 12345)"
category = "weather"
cooldown_seconds = 5 # 5 second cooldown per user to prevent API abuse
requires_internet = True # Requires internet access for NOAA API and geocoding
# NOAA/geocoding need the network, but custom WXSIM/MQTT sources may be LAN-only; check connectivity inside execute paths.
requires_internet = False
# Documentation
short_description = "Get weather for a US location using NOAA weather data"
@@ -299,6 +306,39 @@ class WxCommand(BaseCommand):
return None
def _get_custom_mqtt_weather_topic(self, location: Optional[str] = None) -> Optional[str]:
"""MQTT topic for custom.mqtt_weather.<name> (see get_mqtt_weather_topic)."""
return get_mqtt_weather_topic(self.bot.config, location)
def _mqtt_weather_line(
self,
topic: str,
forecast_type: str,
location_name: Optional[str],
) -> str:
"""Format cached MQTT payload for wx output."""
if forecast_type != "default":
return self.translate("commands.wx.mqtt_forecast_not_supported")
fmt = load_mqtt_weather_format_config(self.bot.config)
cache = getattr(self.bot, "mqtt_weather_cache", None)
text, err = mqtt_weather_display_for_topic(topic, cache, fmt)
if text is not None:
if location_name:
return f"{location_name}: {text}"
return text
return self._mqtt_weather_error_key(err)
def _mqtt_weather_error_key(self, err: Optional[str]) -> str:
if err == "no_cache":
return self.translate("commands.wx.mqtt_weather_no_subscriber")
if err in ("no_data", "empty_payload", "empty_after_sanitize"):
return self.translate("commands.wx.mqtt_weather_no_data")
if err == "stale":
return self.translate("commands.wx.mqtt_weather_stale")
detail = (err or "unknown").replace("_", " ")
return self.translate("commands.wx.mqtt_weather_payload_error", detail=detail)
def _get_wxsim_weather(self, source_url: str, forecast_type: str = "default",
num_days: int = 7, message: MeshMessage = None,
location_name: Optional[str] = None) -> str:
@@ -449,8 +489,20 @@ class WxCommand(BaseCommand):
# Track if we're using companion location (so we always show location in response)
using_companion_location = False
# If no location specified, check for custom WXSIM default source first
# If no location specified, check custom MQTT then WXSIM default sources
if len(parts) < 2:
mqtt_topic = self._get_custom_mqtt_weather_topic(None)
if mqtt_topic:
try:
self.record_execution(message.sender_id)
weather_data = self._mqtt_weather_line(mqtt_topic, "default", None)
await self.send_response(message, weather_data)
return True
except Exception as e:
self.logger.error(f"Error reading MQTT weather: {e}")
await self.send_response(message, self.translate("commands.wx.error", error=str(e)))
return True
wxsim_source = self._get_custom_wxsim_source(None) # Check for default
if wxsim_source:
# Use custom WXSIM default source
@@ -554,6 +606,26 @@ class WxCommand(BaseCommand):
await self.send_response(message, self.translate('commands.wx.usage'))
return True
# Custom MQTT before WXSIM; skip snapshot sources when user asked for NOAA alerts
if not show_full_alerts:
mqtt_topic = self._get_custom_mqtt_weather_topic(location)
if mqtt_topic:
self.logger.info(f"Using custom MQTT weather topic for location '{location}': {mqtt_topic}")
try:
self.record_execution(message.sender_id)
weather_data = self._mqtt_weather_line(
mqtt_topic, forecast_type, location
)
if forecast_type == "multiday":
await self._send_multiday_forecast(message, weather_data)
else:
await self.send_response(message, weather_data)
return True
except Exception as e:
self.logger.error(f"Error reading MQTT weather: {e}")
await self.send_response(message, self.translate("commands.wx.error", error=str(e)))
return True
# Check for custom WXSIM source first (before checking location type)
wxsim_source = self._get_custom_wxsim_source(location)
if wxsim_source:
@@ -0,0 +1,191 @@
#!/usr/bin/env python3
"""
MQTT subscriber for custom weather topics (custom.mqtt_weather.* in [Weather]).
Caches the latest payload per topic for wx / gwx commands.
"""
from __future__ import annotations
import asyncio
import logging
import os
from typing import Any, Optional
try:
import paho.mqtt.client as mqtt
except ImportError:
mqtt = None
from ..clients.mqtt_weather import MqttWeatherCache, iter_mqtt_weather_topics
from .base_service import BaseServicePlugin
class MqttWeatherService(BaseServicePlugin):
"""Subscribe to configured MQTT weather topics and update bot.mqtt_weather_cache."""
config_section = "MqttWeather"
description = "MQTT subscriber for custom.mqtt_weather.* wx/gwx sources"
def __init__(self, bot: Any):
super().__init__(bot)
self.logger = logging.getLogger("MqttWeatherService")
self.logger.setLevel(bot.logger.level)
self._client: Any = None
self._cache: Optional[MqttWeatherCache] = None
self._topics: list[str] = []
def _parse_broker_config(self) -> Optional[dict[str, Any]]:
cfg = self.bot.config
sec = "MqttWeather"
if not cfg.has_section(sec):
return None
host = cfg.get(sec, "broker", fallback="").strip()
if not host:
self.logger.error("MqttWeather: broker hostname is required")
return None
port = cfg.getint(sec, "port", fallback=1883)
transport = cfg.get(sec, "transport", fallback="tcp").strip().lower()
ws_path = cfg.get(sec, "websocket_path", fallback="/mqtt").strip() or "/mqtt"
use_tls = cfg.getboolean(sec, "use_tls", fallback=False)
username = cfg.get(sec, "username", fallback="").strip() or None
password = cfg.get(sec, "password", fallback="").strip() or None
client_id = cfg.get(sec, "client_id", fallback="").strip() or None
qos = cfg.getint(sec, "qos", fallback=0)
if qos not in (0, 1, 2):
qos = 0
return {
"host": host,
"port": port,
"transport": transport,
"websocket_path": ws_path,
"use_tls": use_tls,
"username": username,
"password": password,
"client_id": client_id,
"qos": qos,
}
async def start(self) -> None:
if not self.enabled:
return
self._topics = iter_mqtt_weather_topics(self.bot.config)
if not self._topics:
self.logger.warning(
"MqttWeather enabled but no valid custom.mqtt_weather.* topics in [Weather]; "
"subscriber not started"
)
self._running = True
return
if mqtt is None:
self.logger.error(
"MqttWeather: paho-mqtt not installed; pip install paho-mqtt"
)
self._running = True
return
broker = self._parse_broker_config()
if not broker:
self._running = True
return
self._cache = MqttWeatherCache()
self.bot.mqtt_weather_cache = self._cache
bot_name = self.bot.config.get("Bot", "bot_name", fallback="MeshCoreBot")
client_id = broker["client_id"]
if not client_id:
safe_name = "".join(c if c.isalnum() or c == "-" else "-" for c in bot_name)
client_id = f"{safe_name}-mqtt-wx-{os.getpid()}"
topics = self._topics
qos = broker["qos"]
cache = self._cache
logger = self.logger
def on_message(_client: Any, _userdata: Any, msg: Any) -> None:
try:
payload = msg.payload
if payload is None:
return
if isinstance(payload, (bytes, bytearray)):
b = bytes(payload)
else:
b = str(payload).encode("utf-8", errors="replace")
topic_str = getattr(msg, "topic", None)
if isinstance(topic_str, bytes):
topic_str = topic_str.decode("utf-8", errors="replace")
if not topic_str:
return
cache.update(topic_str, b)
except Exception as e:
logger.debug(f"MqttWeather on_message error: {e}")
def on_connect(client: Any, userdata: Any, flags: Any, rc: int, properties: Any = None) -> None:
if rc != 0:
logger.warning(f"MqttWeather connect failed rc={rc}")
return
for t in userdata:
try:
client.subscribe(t, qos=qos)
logger.info(f"MqttWeather subscribed to {t!r} (qos={qos})")
except Exception as e:
logger.error(f"MqttWeather subscribe failed for {t!r}: {e}")
transport = broker["transport"]
try:
if transport == "websockets":
self._client = mqtt.Client(
client_id=client_id,
userdata=topics,
transport="websockets",
)
self._client.ws_set_options(path=broker["websocket_path"], headers=None)
else:
self._client = mqtt.Client(client_id=client_id, userdata=topics)
self._client.reconnect_delay_set(min_delay=1, max_delay=120)
self._client.on_connect = on_connect
self._client.on_message = on_message
if broker["use_tls"]:
import ssl
self._client.tls_set(cert_reqs=ssl.CERT_NONE)
if broker["username"]:
self._client.username_pw_set(broker["username"], broker["password"])
loop = asyncio.get_event_loop()
def do_connect() -> None:
self._client.connect(broker["host"], broker["port"], keepalive=60)
await loop.run_in_executor(None, do_connect)
self._client.loop_start()
self.logger.info(
f"MqttWeather MQTT started ({broker['host']}:{broker['port']}, {transport})"
)
except Exception as e:
self.logger.error(f"MqttWeather failed to start MQTT client: {e}")
self._client = None
self._running = True
async def stop(self) -> None:
if self._client is not None:
try:
self._client.loop_stop()
self._client.disconnect()
except Exception as e:
self.logger.debug(f"MqttWeather disconnect: {e}")
self._client = None
if self._cache is not None:
self._cache.clear()
self._cache = None
if getattr(self.bot, "mqtt_weather_cache", None) is not None:
delattr(self.bot, "mqtt_weather_cache")
self._running = False
+175
View File
@@ -0,0 +1,175 @@
#!/usr/bin/env python3
"""Unit tests for modules.clients.mqtt_weather."""
import json
import time
from configparser import ConfigParser
from modules.clients.mqtt_weather import (
MqttWeatherCache,
MqttWeatherFormatConfig,
format_mqtt_weather_payload,
get_mqtt_weather_topic,
iter_mqtt_weather_topics,
mqtt_weather_display_for_topic,
validate_mqtt_weather_topic,
)
def test_validate_mqtt_topic() -> None:
assert validate_mqtt_weather_topic("home/weather") is True
assert validate_mqtt_weather_topic("") is False
assert validate_mqtt_weather_topic("a/+") is False
assert validate_mqtt_weather_topic("a/#") is False
def test_get_mqtt_weather_topic_named_and_default() -> None:
cfg = ConfigParser()
cfg.read_dict(
{
"Weather": {
"custom.mqtt_weather.default": "t/default",
"custom.mqtt_weather.patio": "t/patio",
}
}
)
assert get_mqtt_weather_topic(cfg, None) == "t/default"
assert get_mqtt_weather_topic(cfg, "patio") == "t/patio"
assert get_mqtt_weather_topic(cfg, "missing") is None
def test_iter_mqtt_weather_topics_unique() -> None:
cfg = ConfigParser()
cfg.read_dict(
{
"Weather": {
"custom.mqtt_weather.default": "same/topic",
"custom.mqtt_weather.a": "same/topic",
"custom.mqtt_weather.bad": "+/invalid",
}
}
)
topics = iter_mqtt_weather_topics(cfg)
assert topics == ["same/topic"]
def test_format_passthrough() -> None:
fmt = MqttWeatherFormatConfig(
output_mode="passthrough",
json_template="",
json_device_key="",
json_device_value="",
max_payload_bytes=1024,
passthrough_max_length=200,
stale_after_seconds=60.0,
)
raw = b"Date/Time: now\nTemp: 20 C"
text, err = format_mqtt_weather_payload(raw, fmt)
assert err is None
assert text == "Date/Time: now\nTemp: 20 C"
def test_format_json_template_and_device_filter() -> None:
fmt = MqttWeatherFormatConfig(
output_mode="json_template",
json_template="{time} | {temperature_f}F / {temperature_c}C | {humidity}%",
json_device_key="device",
json_device_value="231",
max_payload_bytes=1024,
passthrough_max_length=500,
stale_after_seconds=60.0,
)
payload = {
"device": "231",
"temperature_F": 50.0,
"humidity": 55,
"time": "2026-04-01 12:00",
}
raw = json.dumps(payload).encode()
text, err = format_mqtt_weather_payload(raw, fmt)
assert err is None
assert "50.00F" in text or "50.00" in text
assert "10.00C" in text or "10.00" in text
assert "55%" in text
payload_bad = dict(payload)
payload_bad["device"] = "999"
raw2 = json.dumps(payload_bad).encode()
text2, err2 = format_mqtt_weather_payload(raw2, fmt)
assert text2 is None
assert err2 == "device_filter_mismatch"
def test_payload_too_large() -> None:
fmt = MqttWeatherFormatConfig(
output_mode="passthrough",
json_template="",
json_device_key="",
json_device_value="",
max_payload_bytes=10,
passthrough_max_length=500,
stale_after_seconds=60.0,
)
text, err = format_mqtt_weather_payload(b"x" * 20, fmt)
assert text is None
assert err == "payload_too_large"
def test_invalid_template_placeholder() -> None:
fmt = MqttWeatherFormatConfig(
output_mode="json_template",
json_template="{time} {bogus}",
json_device_key="",
json_device_value="",
max_payload_bytes=1024,
passthrough_max_length=500,
stale_after_seconds=60.0,
)
raw = json.dumps(
{"time": "t", "temperature_F": 32, "humidity": 40, "device": "1"}
).encode()
text, err = format_mqtt_weather_payload(raw, fmt)
assert text is None
assert err is not None
assert "bogus" in err
def test_mqtt_weather_display_stale() -> None:
cache = MqttWeatherCache()
cache.update("t1", b"hello")
# force old timestamp
with cache._lock:
payload, _ts = cache._by_topic["t1"]
cache._by_topic["t1"] = (payload, time.monotonic() - 100.0)
fmt = MqttWeatherFormatConfig(
output_mode="passthrough",
json_template="",
json_device_key="",
json_device_value="",
max_payload_bytes=1024,
passthrough_max_length=500,
stale_after_seconds=30.0,
)
text, err = mqtt_weather_display_for_topic("t1", cache, fmt)
assert text is None
assert err == "stale"
def test_mqtt_weather_display_no_cache() -> None:
fmt = MqttWeatherFormatConfig(
output_mode="passthrough",
json_template="",
json_device_key="",
json_device_value="",
max_payload_bytes=1024,
passthrough_max_length=500,
stale_after_seconds=60.0,
)
text, err = mqtt_weather_display_for_topic("missing", MqttWeatherCache(), fmt)
assert text is None
assert err == "no_data"
text2, err2 = mqtt_weather_display_for_topic("missing", None, fmt)
assert text2 is None
assert err2 == "no_cache"
+11 -1
View File
@@ -79,7 +79,12 @@
"tomorrow_not_available": "Tomorrow's forecast not available",
"tomorrow_error": "Error formatting tomorrow's forecast",
"multiday_not_available": "{num_days}-day forecast not available",
"multiday_error": "Error formatting {num_days}-day forecast"
"multiday_error": "Error formatting {num_days}-day forecast",
"mqtt_forecast_not_supported": "Extended forecast is not available for MQTT weather sources",
"mqtt_weather_no_subscriber": "MQTT weather subscriber is not active (enable [MqttWeather] and custom.mqtt_weather.* topics)",
"mqtt_weather_no_data": "No MQTT weather message received for this topic yet",
"mqtt_weather_stale": "MQTT weather data is too old",
"mqtt_weather_payload_error": "MQTT weather payload error: {detail}"
},
"gwx": {
"description": "Get weather information for any global location (usage: gwx Tokyo)",
@@ -93,6 +98,11 @@
"tomorrow_error": "Error formatting tomorrow's forecast",
"multiday_not_available": "{num_days}-day forecast not available",
"multiday_error": "Error formatting {num_days}-day forecast",
"mqtt_forecast_not_supported": "Extended forecast is not available for MQTT weather sources",
"mqtt_weather_no_subscriber": "MQTT weather subscriber is not active (enable [MqttWeather] and custom.mqtt_weather.* topics)",
"mqtt_weather_no_data": "No MQTT weather message received for this topic yet",
"mqtt_weather_stale": "MQTT weather data is too old",
"mqtt_weather_payload_error": "MQTT weather payload error: {detail}",
"periods": {
"today": "Today",
"tonight": "Tonight",