Files
meshcore-bot/modules/commands/rain_command.py
agessaman 4bf60622ff refactor(rain): enhance fetch_precip_series_nws with caching support
- Added cache_ttl parameter to fetch_precip_series_nws to enable caching of results for improved performance.
- Implemented logic to reuse cached results based on location and cache expiration.
- Updated tests to ensure proper handling of cache_ttl without causing errors.
- Refactored WorldCupFastcastClient to streamline connection handling and improve readability.
2026-06-15 15:26:55 -07:00

1272 lines
55 KiB
Python

#!/usr/bin/env python3
"""
Rain nowcast command - minute-level "rain starting/stopping in ~N min" using
Open-Meteo's 15-minutely precipitation forecast. Works worldwide, no API key.
"""
import asyncio
import re
import time
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Any, Optional
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from ..models import MeshMessage
from ..region_capitals import REGION_DEFAULT_NOTE, region_capital_query
from ..utils import geocode_city_sync, geocode_zipcode_sync, normalize_us_state
from .base_command import BaseCommand
# WMO weather code -> precipitation "bucket". Buckets map to an emoji and a
# translatable label (commands.rain.precip_types.<bucket>). Codes not listed
# here are non-precipitating and never trigger a nowcast.
_PRECIP_BUCKETS: dict[int, str] = {
51: "drizzle", 53: "drizzle", 55: "drizzle",
56: "freezing", 57: "freezing",
61: "rain", 63: "rain", 65: "heavy_rain",
66: "freezing", 67: "freezing",
71: "snow", 73: "snow", 75: "snow", 77: "snow",
80: "showers", 81: "showers", 82: "heavy_rain",
85: "snow", 86: "snow",
95: "thunder", 96: "thunder", 99: "thunder",
}
# Emoji per bucket (leads the response line).
_BUCKET_EMOJI: dict[str, str] = {
"drizzle": "🌦️",
"rain": "🌧️",
"heavy_rain": "🌧️",
"freezing": "🧊",
"snow": "🌨️",
"showers": "🌦️",
"thunder": "⛈️",
}
# English fallbacks for precip type labels (translations override via
# commands.rain.precip_types.<bucket>; missing keys fall back to en.json).
_BUCKET_LABEL_EN: dict[str, str] = {
"drizzle": "Drizzle",
"rain": "Rain",
"heavy_rain": "Heavy rain",
"freezing": "Freezing rain",
"snow": "Snow",
"showers": "Showers",
"thunder": "Thunderstorms",
}
# Precip "families" for the !rain vs !snow modes. A command looks for its own
# family across the window first; only if none is coming does it fall back to
# the other type with a "No <type>, but ..." heads-up. Freezing rain is liquid,
# so it lives in the rain family (a !snow ice-event reads "No snow, but ...").
RAIN_FAMILY = frozenset({"drizzle", "rain", "heavy_rain", "showers", "thunder", "freezing"})
SNOW_FAMILY = frozenset({"snow"})
# Upper bound on the per-instance geocoding caches so a long-running bot that's
# queried for many distinct locations can't grow them without limit.
_GEOCODE_CACHE_CAP = 256
def _cache_put(cache: dict, key: Any, value: Any) -> None:
"""Insert into a size-capped cache, evicting the oldest entry when full.
Relies on dicts preserving insertion order (Python 3.7+).
"""
if key not in cache and len(cache) >= _GEOCODE_CACHE_CAP:
cache.pop(next(iter(cache)))
cache[key] = value
def precip_bucket_for_code(code: Optional[int]) -> Optional[str]:
"""Map a WMO weather code to a precipitation bucket, or None if not precip."""
if code is None:
return None
try:
return _PRECIP_BUCKETS.get(int(code))
except (TypeError, ValueError):
return None
def titlecase_location(text: str) -> str:
"""Tidy a user-typed location for display.
'middlesboro, ky' -> 'Middlesboro, KY'; 'paris, france' -> 'Paris, France';
'memphis' -> 'Memphis'. A 2-letter token after a comma is treated as a
state/country code and upper-cased; everything else is title-cased.
"""
parts = [p.strip() for p in text.split(",") if p.strip()]
if not parts:
return text.strip()
out = []
for i, p in enumerate(parts):
if i > 0 and len(p) == 2 and p.isalpha():
out.append(p.upper())
else:
out.append(p.title())
return ", ".join(out)
# US state / territory 2-letter codes — used to drop a trailing state from a
# typed location like "london ky" (no comma) so it doesn't become "London Ky".
US_STATE_ABBRS = frozenset({
"AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DE", "FL", "GA", "HI", "ID", "IL",
"IN", "IA", "KS", "KY", "LA", "ME", "MD", "MA", "MI", "MN", "MS", "MO", "MT",
"NE", "NV", "NH", "NJ", "NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI",
"SC", "SD", "TN", "TX", "UT", "VT", "VA", "WA", "WV", "WI", "WY",
"DC", "AS", "GU", "MP", "PR", "VI",
})
def city_display_name(typed_location: str, suffix: Optional[str] = None) -> str:
"""City part of a typed location for display, dropping a trailing region the
user appended without a comma.
'london ky' -> 'London'; 'paris france' -> 'Paris'; 'london, ky' -> 'London';
'oklahoma city' -> 'Oklahoma City'. `suffix` is the geocoder's authoritative
state/country (e.g. 'KY' or 'France'); when the typed text ends with it, it's
stripped so it isn't doubled into the city name. The state/country is added
back separately by the caller.
"""
head = typed_location.split(",")[0].strip()
# Drop a trailing region matching the geocoder's suffix — handles country
# names and multi-word regions ("paris france", "london united kingdom").
if suffix and head.lower().endswith(" " + suffix.lower()):
head = head[: -len(suffix)].strip()
# Drop a trailing US state abbreviation ("london ky" -> "london").
tokens = head.split()
if len(tokens) >= 2 and tokens[-1].upper() in US_STATE_ABBRS:
head = " ".join(tokens[:-1])
return titlecase_location(head)
def join_location(city: Optional[str], suffix: Optional[str]) -> str:
"""Join a city and its state/country suffix as 'City, Suffix'.
Collapses to a single name when one side is missing or the two name the same
place (case-insensitive) — so a country typed as the city ('spain' -> 'Spain',
not 'Spain, Spain') or a city-state ('Singapore', not 'Singapore, Singapore')
renders once.
"""
city = (city or "").strip()
suffix = (suffix or "").strip()
if not suffix:
return city
if not city or city.lower() == suffix.lower():
return suffix
return f"{city}, {suffix}"
def reverse_geocode_region(
bot: Any, lat: float, lon: float, *, timeout: int = 10, logger: Any = None
) -> tuple[Optional[str], Optional[str]]:
"""Reverse-geocode to (city, suffix), respecting the bot's Nominatim rate limiter.
suffix is the US state abbreviation ('TN') for US points, else the English
country name ('Japan'). Requests language='en' so country names aren't
localized. No caching (callers cache as needed). Shared by the rain command
and the Weather_Service proactive push so both label locations identically.
"""
city: Optional[str] = None
suffix: Optional[str] = None
try:
from ..utils import get_nominatim_geocoder
limiter = getattr(bot, "nominatim_rate_limiter", None)
if limiter is not None:
limiter.wait_for_request_sync()
geolocator = get_nominatim_geocoder(timeout=timeout)
# language="en" so country names come back in English ("Japan", not "日本").
result = geolocator.reverse(f"{lat}, {lon}", timeout=timeout, language="en")
if limiter is not None:
limiter.record_request()
if result is not None and hasattr(result, "raw"):
address = result.raw.get("address", {})
city = (
address.get("city")
or address.get("town")
or address.get("village")
or address.get("municipality")
or address.get("county")
or None
)
country_code = (address.get("country_code") or "").lower()
if country_code == "us":
iso = address.get("ISO3166-2-lvl4") or address.get("ISO3166-2-lvl6") or ""
if "-" in iso:
suffix = iso.rsplit("-", 1)[-1]
else:
state_abbr, _ = normalize_us_state(address.get("state", ""))
suffix = state_abbr or address.get("state") or None
else:
suffix = address.get("country") or None
except Exception as e:
if logger:
logger.debug(f"Error reverse geocoding {lat},{lon}: {e}")
return city, suffix
def precip_descriptor(bucket: Optional[str]) -> tuple[str, str]:
"""Return (emoji, English label) for a precip bucket; defaults to rain.
The label is English; localized callers use the commands.rain.precip_types
translation keys instead. Shared so the Weather_Service can build proactive
nowcast messages without duplicating the bucket tables.
"""
b = bucket or "rain"
return _BUCKET_EMOJI.get(b, "🌧️"), _BUCKET_LABEL_EN.get(b, "Rain")
def format_precip_amount(mm: Optional[float], unit: str = "in") -> Optional[str]:
"""Format an accumulated precip total (mm) for display, or None if negligible.
``unit`` "in" renders inches (the US default), anything else millimetres.
Returns None for a non-positive total so callers can omit the estimate
entirely. Inches trim trailing zeros: 0.20 -> "0.2 in", 0.05 -> "0.05 in".
"""
if mm is None or mm <= 0:
return None
if unit == "mm":
return f"{mm:.1f} mm" if mm >= 0.1 else "<0.1 mm"
inches = mm * 0.0393701
if inches < 0.01:
return "<0.01 in"
return f"{inches:.2f}".rstrip("0").rstrip(".") + " in"
def format_snow_amount(cm: Optional[float], unit: str = "in") -> Optional[str]:
"""Format a snowfall total (cm of actual snow) for display, or None if negligible.
Snow is reported as depth, not liquid equivalent (Open-Meteo's ``precipitation``
is the melted equivalent, ~7x less). ``unit`` "in" renders inches of snow (US
default), anything else centimetres. The trailing " snow" keeps it distinct
from the liquid rain estimate; 0.1 precision suits snow's coarseness.
"""
if cm is None or cm <= 0:
return None
if unit == "mm":
return f"{cm:.1f} cm snow" if cm >= 0.1 else "<0.1 cm snow"
inches = cm * 0.393701
if inches < 0.1:
return "<0.1 in snow"
return f"{inches:.1f}".rstrip("0").rstrip(".") + " in snow"
def format_amount_estimate(
bucket: Optional[str], amount_mm: Optional[float], snow_cm: Optional[float], unit: str = "in"
) -> Optional[str]:
"""The estimate string for a precip bucket, or None if negligible.
Picks the right quantity per type: snow shows depth ("3 in snow"); freezing
rain shows its liquid/glaze amount tagged "ice" ("0.1 in ice", ~1:1 with
accretion); everything else is plain liquid ("0.2 in").
"""
if bucket == "snow":
return format_snow_amount(snow_cm, unit)
amt = format_precip_amount(amount_mm, unit)
if amt and bucket == "freezing":
return f"{amt} ice"
return amt
def episode_probability_temp(series: dict, result: "NowcastResult") -> tuple[Optional[int], Optional[int]]:
"""Precip probability (%) and 2 m temperature (°F) at the episode's defining
moment — the current bucket when precipitating now, else the start bucket.
Returns (None, None) for dry_clear or when the data is missing.
"""
if result is None or result.state == "dry_clear":
return None, None
times = series.get("times") or []
probs = series.get("prob") or []
temps = series.get("temp") or []
try:
now = datetime.fromisoformat(series["now"])
except (TypeError, ValueError, KeyError):
return None, None
if result.state in ("raining_stopping", "raining_continuing"):
target = now
else: # dry_incoming: align with when precip begins
target = now + timedelta(minutes=result.minutes or 0)
best_i: Optional[int] = None
best_d: Optional[float] = None
for i, t in enumerate(times):
try:
tt = datetime.fromisoformat(t)
except (TypeError, ValueError):
continue
d = abs((tt - target).total_seconds())
if best_d is None or d < best_d:
best_d, best_i = d, i
if best_i is None:
return None, None
prob = probs[best_i] if best_i < len(probs) else None
tc = temps[best_i] if best_i < len(temps) else None
prob_pct = int(round(prob)) if prob is not None else None
temp_f = int(round(tc * 9 / 5 + 32)) if tc is not None else None
return prob_pct, temp_f
# Short-lived cache of fetched series, keyed by rounded coords + model, so the
# command and the proactive poll can reuse one fetch instead of re-hitting
# Open-Meteo. Bounded; entries expire by the caller's cache_ttl.
_SERIES_CACHE: dict[tuple, tuple[float, dict]] = {}
_SERIES_CACHE_CAP = 64
def fetch_precip_series(
session: Any,
lat: float,
lon: float,
*,
weather_model: str = "",
timeout: int = 10,
logger: Any = None,
cache_ttl: float = 0.0,
) -> Optional[dict]:
"""Fetch + normalize an Open-Meteo precipitation series using `session`.
Prefers 15-minutely data; falls back to hourly when a model doesn't provide
minutely_15. The caller owns the session's lifecycle. Returns a dict with keys
times, precip, snow, prob, temp, codes, now, current_precip, current_code,
step — or None on any error. Precipitation is requested in mm (detection is
unit-independent); snowfall in cm; prob is precip probability (%); temp is
2 m temperature (°C). When cache_ttl > 0, a fresh prior result for the same
rounded location is reused.
"""
cache_key = (round(lat, 2), round(lon, 2), weather_model)
if cache_ttl > 0:
hit = _SERIES_CACHE.get(cache_key)
if hit is not None and (time.time() - hit[0]) < cache_ttl:
return hit[1]
api_url = "https://api.open-meteo.com/v1/forecast"
variables = "precipitation,snowfall,weather_code,precipitation_probability,temperature_2m"
params: dict[str, Any] = {
"latitude": lat,
"longitude": lon,
"minutely_15": variables,
"hourly": variables,
"current": "precipitation,snowfall,weather_code,temperature_2m",
"precipitation_unit": "mm",
"timezone": "auto",
"forecast_days": 2, # cover the window even when "now" is late in the day
}
if weather_model:
params["models"] = weather_model
try:
response = session.get(api_url, params=params, timeout=timeout)
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
if logger:
logger.debug(f"Open-Meteo nowcast timeout/connection error: {e}")
return None
if not response.ok:
if logger:
logger.warning(f"Open-Meteo nowcast error: HTTP {response.status_code}")
return None
data = response.json()
current = data.get("current", {}) or {}
now = current.get("time")
if not now:
return None
common = {
"now": now,
"current_precip": current.get("precipitation"),
"current_code": current.get("weather_code"),
}
series: Optional[dict] = None
m15 = data.get("minutely_15", {}) or {}
m_times = m15.get("time") or []
m_precip = m15.get("precipitation") or []
if m_times and any(p is not None for p in m_precip):
series = {
"times": m_times, "precip": m_precip,
"snow": m15.get("snowfall") or [],
"prob": m15.get("precipitation_probability") or [],
"temp": m15.get("temperature_2m") or [],
"codes": m15.get("weather_code") or [], "step": 15, **common,
}
else:
hourly = data.get("hourly", {}) or {}
h_times = hourly.get("time") or []
if not h_times:
return None
series = {
"times": h_times, "precip": hourly.get("precipitation") or [],
"snow": hourly.get("snowfall") or [],
"prob": hourly.get("precipitation_probability") or [],
"temp": hourly.get("temperature_2m") or [],
"codes": hourly.get("weather_code") or [], "step": 60, **common,
}
if cache_ttl > 0:
if len(_SERIES_CACHE) >= _SERIES_CACHE_CAP:
_SERIES_CACHE.pop(next(iter(_SERIES_CACHE)))
_SERIES_CACHE[cache_key] = (time.time(), series)
return series
# --- NWS gridpoint precip source ---------------------------------------------
# WHY THIS EXISTS: the Open-Meteo *forecast model* (fetch_precip_series, above)
# smooths away scattered, pop-up convection, so the nowcast can miss rain that is
# actually happening. Observed near Nashville (36.16, -86.78): Open-Meteo reported
# 0.00 in / ~12% precip across the next 3 h while NWS's own gridpoint showed
# 65-74% probability with measurable QPF — and thunderstorms were occurring. The
# model-based push therefore never fired. NWS's gridpoint forecast is
# forecaster-adjusted and does capture convective chances, so for US points we
# prefer it (fetch_precip_series_nws) and fall back to Open-Meteo only where NWS
# has no coverage (outside the US) or the request fails.
# NWS gridpoint "weather" type -> a representative WMO code, so precip_bucket_for_code()
# classifies the NWS series exactly like it classifies the Open-Meteo one.
_NWS_WEATHER_CODE = [
("thunderstorm", 95),
("snow", 73), ("blowing_snow", 73), ("snow_showers", 73),
("ice", 66), ("sleet", 66), ("freezing", 66), ("ice_pellets", 66),
("drizzle", 53),
("rain_showers", 81), ("showers", 81),
("rain", 63),
]
def _iso_duration_hours(dur: str) -> int:
"""Hours spanned by an ISO-8601 duration like 'PT6H', 'PT1H', 'P1DT6H' (min 1)."""
m = re.match(r"P(?:(\d+)D)?(?:T(?:(\d+)H)?(?:(\d+)M)?)?", dur or "")
if not m:
return 1
days, hours, mins = (int(g) if g else 0 for g in m.groups())
return max(1, days * 24 + hours + (1 if mins else 0))
def _nws_hourly(values: Optional[list], *, divide: bool) -> dict:
"""Map hour-start (naive UTC datetime) -> value from an NWS gridpoint property.
NWS reports each property as time-bucketed values whose validTime is an ISO
interval like '2026-06-08T12:00:00+00:00/PT6H'. ``divide`` splits an
accumulation (e.g. 6-hour QPF) evenly across its hours; otherwise the period's
value is repeated for each hour (hourly PoP, the weather-type list).
"""
out: dict = {}
for v in values or []:
try:
start_s, _, dur = (v.get("validTime") or "").partition("/")
start = datetime.fromisoformat(start_s).astimezone(timezone.utc).replace(tzinfo=None)
except (TypeError, ValueError, AttributeError):
continue
n = _iso_duration_hours(dur)
raw = v.get("value")
share = (raw / n) if (divide and raw is not None) else raw
for k in range(n):
out[start + timedelta(hours=k)] = share
return out
def _nws_weather_code(value: Any) -> Optional[int]:
"""Pick a representative WMO code from an NWS gridpoint ``weather`` value (list of segments)."""
if not value:
return None
blob = " ".join(
str(seg.get("weather") or "") for seg in value if isinstance(seg, dict)
).lower()
if not blob.strip():
return None
for needle, code in _NWS_WEATHER_CODE:
if needle in blob or needle.replace("_", " ") in blob:
return code
return 63 # precip of unknown type -> rain
def fetch_precip_series_nws(
session: Any,
lat: float,
lon: float,
*,
timeout: int = 10,
logger: Any = None,
pop_floor: int = 50,
cache_ttl: float = 0.0,
) -> Optional[dict]:
"""Build a precip nowcast series from the NWS gridpoint forecast (US only).
Returns the same shape as fetch_precip_series (times/precip/codes/now/
current_precip/current_code/step), or None when NWS has no coverage (e.g.
outside the US) so the caller can fall back to Open-Meteo.
NWS exposes 6-hour QPF (mm) and hourly PoP (%). We build an hourly series in
which each hour's precip is its QPF share, but zeroed when that hour's PoP is
below ``pop_floor`` -- so the predicted rain-start tracks the hourly
probability rather than snapping to coarse 6-hour QPF boundaries, and a trace
of QPF at a low chance is not reported as rain. Times are naive UTC ISO strings
(they only need to be self-consistent: the nowcast works on relative minutes).
When cache_ttl > 0, a fresh prior result for the same rounded location is reused
(shared bounded cache with fetch_precip_series).
"""
cache_key = (round(lat, 2), round(lon, 2), "nws")
if cache_ttl > 0:
hit = _SERIES_CACHE.get(cache_key)
if hit is not None and (time.time() - hit[0]) < cache_ttl:
return hit[1]
headers = {"User-Agent": "(meshcore-bot, weather-nowcast)", "Accept": "application/geo+json"}
try:
pts = session.get(
f"https://api.weather.gov/points/{round(lat, 4)},{round(lon, 4)}",
headers=headers, timeout=timeout,
)
if not pts.ok:
return None # no NWS coverage (outside the US) -> caller falls back to Open-Meteo
grid_url = (pts.json().get("properties") or {}).get("forecastGridData")
if not grid_url:
return None
gp = session.get(grid_url, headers=headers, timeout=timeout)
if not gp.ok:
return None
props = gp.json().get("properties") or {}
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
if logger:
logger.debug(f"NWS nowcast timeout/connection error: {e}")
return None
except (ValueError, KeyError, TypeError) as e:
if logger:
logger.debug(f"NWS nowcast parse error: {e}")
return None
qpf = _nws_hourly((props.get("quantitativePrecipitation") or {}).get("values"), divide=True)
pop = _nws_hourly((props.get("probabilityOfPrecipitation") or {}).get("values"), divide=False)
wx = _nws_hourly((props.get("weather") or {}).get("values"), divide=False)
if not qpf and not pop:
return None
now = datetime.now(timezone.utc).replace(tzinfo=None)
base = now.replace(minute=0, second=0, microsecond=0)
hours = [base + timedelta(hours=i) for i in range(0, 6)] # current hour + 5 ahead (covers the window)
times: list[str] = []
precip: list[Optional[float]] = []
codes: list[Optional[int]] = []
for h in hours:
p = pop.get(h)
q = qpf.get(h)
# Count an hour as precipitating only when NWS gives a real chance; the
# amount is its QPF share. (QPF is 6-hourly, PoP hourly -- PoP sets timing.)
amt = q if (q is not None and p is not None and p >= pop_floor) else 0.0
times.append(h.isoformat(timespec="minutes"))
precip.append(amt)
codes.append(_nws_weather_code(wx.get(h)) if amt else None)
result = {
"times": times,
"precip": precip,
"codes": codes,
"now": now.isoformat(timespec="minutes"),
"current_precip": precip[0] if precip else None,
"current_code": codes[0] if codes else None,
"step": 60,
}
if cache_ttl > 0:
if len(_SERIES_CACHE) >= _SERIES_CACHE_CAP:
_SERIES_CACHE.pop(next(iter(_SERIES_CACHE)))
_SERIES_CACHE[cache_key] = (time.time(), result)
return result
@dataclass
class NowcastResult:
"""Outcome of a precipitation nowcast analysis.
state is one of:
- "dry_clear": dry now, no precip within the window
- "dry_incoming": dry now, precip starts in `minutes`
- "raining_stopping": precip now, drops below threshold in `minutes`
- "raining_continuing": precip now, never clears within the window
"""
state: str
minutes: Optional[int] = None # until start (dry_incoming) or stop (raining_stopping)
duration_minutes: Optional[int] = None # for dry_incoming: how long the precip lasts
open_ended: bool = False # precip extends past the analysis window
bucket: Optional[str] = None # precip bucket (drizzle/rain/snow/...) when raining/incoming
amount_mm: Optional[float] = None # estimated liquid precip total (mm) over the episode within the window
snow_cm: Optional[float] = None # estimated snowfall total (cm) over the episode (snow depth, not liquid)
def _round5(minutes: float) -> int:
"""Round to the nearest 5 minutes, with a floor of 5 for positive values."""
r = int(round(minutes / 5.0) * 5)
if minutes > 0 and r < 5:
return 5
return max(0, r)
def analyze_precip_nowcast(
times: list[str],
precip: list[Optional[float]],
codes: list[Optional[int]],
now_iso: str,
*,
window_minutes: int = 120,
threshold: float = 0.1,
current_precip: Optional[float] = None,
current_code: Optional[int] = None,
snow: Optional[list[Optional[float]]] = None,
family: Optional[frozenset[str]] = None,
) -> Optional[NowcastResult]:
"""Pure nowcast analysis over a precipitation time series.
All times are naive ISO-8601 local strings (e.g. "2026-06-03T14:15") in the
same timezone as `now_iso`, so "now" is derived from the API rather than the
host clock. Returns None when the series is too sparse to analyze.
Args:
times: Bucket start times (ascending), ISO local strings.
precip: Precipitation amount per bucket, in mm. None is treated as 0.
codes: WMO weather code per bucket (parallel to `times`); used for the
precip type only.
now_iso: Current time, ISO local string (from the API's current.time).
window_minutes: How far ahead to look.
threshold: mm-per-bucket at/above which a bucket counts as precipitating.
current_precip: Optional instantaneous precip (API current.precipitation);
preferred over the bucket value for the "raining right now" decision.
current_code: Optional current WMO code, used for the precip type when
raining now.
snow: Optional snowfall per bucket, in cm (parallel to `times`). When a
snow episode is reported, snow_cm gives the depth estimate (snowfall
is the actual accumulation; precip is only its liquid equivalent).
family: Optional set of bucket names (e.g. {"snow"}); a bucket then counts
as precipitating only if its code maps into the family. None (default)
counts any precip — used to answer "is *snow* coming?" vs "rain?".
"""
if not times or not precip:
return None
def counts(amt: float, code: Optional[int]) -> bool:
"""Whether a bucket is precipitating for this query (>= threshold, and in
the requested precip family when one is given)."""
if amt < threshold:
return False
return family is None or precip_bucket_for_code(code) in family
n = min(len(times), len(precip))
try:
now = datetime.fromisoformat(now_iso)
except (TypeError, ValueError):
return None
parsed: list[tuple[datetime, float, Optional[int], float]] = []
for i in range(n):
try:
t = datetime.fromisoformat(times[i])
except (TypeError, ValueError):
continue
amt = precip[i]
amt = 0.0 if amt is None else float(amt)
code = codes[i] if i < len(codes) else None
sf = snow[i] if (snow is not None and i < len(snow)) else None
sf = 0.0 if sf is None else float(sf)
parsed.append((t, amt, code, sf))
if not parsed:
return None
parsed.sort(key=lambda x: x[0])
# Index of the bucket containing "now" (largest start time <= now).
cur_idx = -1
for i, (t, _amt, _c, _sf) in enumerate(parsed):
if t <= now:
cur_idx = i
else:
break
# Upcoming buckets strictly after now, within the window.
upcoming: list[tuple[float, float, Optional[int], float]] = [] # (mins, precip_mm, code, snow_cm)
for t, amt, code, sf in parsed[cur_idx + 1:]:
mins = (t - now).total_seconds() / 60.0
if mins <= 0:
continue
if mins > window_minutes:
break
upcoming.append((mins, amt, code, sf))
# "Precipitating now?" (of the requested family) — prefer the API's
# instantaneous value + current code, else the current bucket.
now_code = current_code if current_code is not None else (parsed[cur_idx][2] if cur_idx >= 0 else None)
if current_precip is not None:
raining_now = counts(float(current_precip), now_code)
elif cur_idx >= 0:
raining_now = counts(parsed[cur_idx][1], parsed[cur_idx][2])
else:
raining_now = False
if raining_now:
bucket = precip_bucket_for_code(now_code) or "rain"
# Accumulate the episode totals: liquid (mm) and snowfall (cm), the current
# bucket plus each upcoming bucket until precip drops below threshold.
total = max(0.0, parsed[cur_idx][1]) if cur_idx >= 0 else 0.0
total_snow = max(0.0, parsed[cur_idx][3]) if cur_idx >= 0 else 0.0
for mins, amt, code, sf in upcoming:
if not counts(amt, code):
return NowcastResult(
state="raining_stopping", minutes=_round5(mins), bucket=bucket,
amount_mm=total, snow_cm=total_snow,
)
total += amt
total_snow += sf
return NowcastResult(
state="raining_continuing", open_ended=True, bucket=bucket,
amount_mm=total, snow_cm=total_snow,
)
# Dry now (of the requested family): find the first upcoming precip bucket.
for idx, (mins, amt, code, sf) in enumerate(upcoming):
if counts(amt, code):
bucket = precip_bucket_for_code(code) or "rain"
# How long does it last, and how much falls? Walk until it drops below
# threshold, summing liquid (mm) and snowfall (cm) for the estimate.
total = amt
total_snow = sf
end_mins: Optional[float] = None
for mins2, amt2, code2, sf2 in upcoming[idx + 1:]:
if not counts(amt2, code2):
end_mins = mins2
break
total += amt2
total_snow += sf2
if end_mins is None:
return NowcastResult(
state="dry_incoming", minutes=_round5(mins), open_ended=True,
bucket=bucket, amount_mm=total, snow_cm=total_snow,
)
return NowcastResult(
state="dry_incoming",
minutes=_round5(mins),
duration_minutes=_round5(end_mins - mins),
bucket=bucket,
amount_mm=total,
snow_cm=total_snow,
)
return NowcastResult(state="dry_clear")
def decide_rain_notification(
state: str,
minutes: Optional[int],
*,
lead_minutes: int,
start_announced: bool,
end_announced: bool,
seconds_since_last_start: Optional[float] = None,
seconds_since_last_end: Optional[float] = None,
renotify_minutes: int,
announce_ending: bool = True,
) -> tuple[Optional[str], bool, bool]:
"""Decide whether to push a proactive rain notice, and which kind.
A small state machine that keeps the Weather_Service from spamming a channel
every poll. Returns ``(kind, start_announced, end_announced)`` where ``kind``
is ``None``, ``"starting"`` (rain about to begin), or ``"ending"`` (rain about
to stop); the two booleans are the caller's updated per-episode flags.
- ``dry_clear`` ends the episode and re-arms both notices.
- ``dry_incoming`` fires ``"starting"`` once when precip enters the
``lead_minutes`` window.
- ``raining_stopping`` fires ``"ending"`` once when the clear-up enters the
window (unless ``announce_ending`` is False).
- Each notice fires at most once per episode; a ``renotify_minutes`` cooldown
(tracked separately per kind) absorbs forecast flapping.
"""
if state == "dry_clear":
return (None, False, False)
if state == "dry_incoming":
if minutes is None or minutes > lead_minutes:
return (None, start_announced, end_announced) # coming, but not yet within lead
if start_announced:
return (None, True, end_announced)
if seconds_since_last_start is not None and seconds_since_last_start < renotify_minutes * 60:
return (None, start_announced, end_announced) # cooldown: hold off, stay re-armed
return ("starting", True, end_announced)
# Raining now: the "starting" moment has passed (or was missed) — mark it so a
# late "starting" never fires.
if state == "raining_continuing":
return (None, True, end_announced)
if state == "raining_stopping":
if not announce_ending or minutes is None or minutes > lead_minutes:
return (None, True, end_announced)
if end_announced:
return (None, True, True)
if seconds_since_last_end is not None and seconds_since_last_end < renotify_minutes * 60:
return (None, True, end_announced)
return ("ending", True, True)
return (None, start_announced, end_announced)
class RainCommand(BaseCommand):
"""Minute-level rain nowcast for a location (Open-Meteo 15-minutely precip)."""
name = "rain"
keywords = ["rain", "nowcast", "snow"]
description = "Rain/snow nowcast: when precip starts or stops in the next ~2h, with amount"
category = "weather"
requires_internet = True
cooldown_seconds = 5
short_description = "Rain/snow nowcast (when precip starts/stops) for a location"
usage = "rain|snow [city|zipcode|lat,lon]"
examples = ["rain", "snow", "rain seattle", "snow 98101", "rain 47.6,-122.3"]
parameters = [
{"name": "location", "description": "Optional: city, US ZIP, or lat,lon. Default: companion or bot location."}
]
def __init__(self, bot: Any) -> None:
super().__init__(bot)
self.rain_enabled = self.get_config_value("Rain_Command", "enabled", fallback=True, value_type="bool")
self.default_state = self.bot.config.get("Weather", "default_state", fallback="")
self.default_country = self.bot.config.get("Weather", "default_country", fallback="US")
self.weather_model = self.bot.config.get("Weather", "weather_model", fallback="").strip()
self.url_timeout = 10
self.window_minutes = self.get_config_value(
"Rain_Command", "window_minutes", fallback=120, value_type="int"
)
# mm-per-15min at/above which a bucket counts as precipitating.
self.threshold_mm = self.get_config_value(
"Rain_Command", "precip_threshold_mm", fallback=0.1, value_type="float"
)
# Optional precip-amount estimate appended to the nowcast line, e.g.
# "(est 0.2 in)". Unit "in" (US default) or "mm"; show_amount toggles it.
self.show_amount = self.get_config_value(
"Rain_Command", "show_amount", fallback=True, value_type="bool"
)
self.amount_unit = self.bot.config.get(
"Rain_Command", "amount_unit", fallback="in"
).strip().lower()
# Show precip probability "(…, 70%)" and a borderline-temperature tag
# "34°F" (only when ~30-38°F, where rain/snow/ice is in doubt).
self.show_probability = self.get_config_value(
"Rain_Command", "show_probability", fallback=True, value_type="bool"
)
self.show_temp = self.get_config_value(
"Rain_Command", "show_temp", fallback=True, value_type="bool"
)
# Reuse a fetched series for this many seconds (shared with the proactive
# poll); 0 disables. Short so the nowcast's "now" stays fresh.
self.cache_ttl = self.get_config_value(
"Rain_Command", "cache_seconds", fallback=300, value_type="int"
)
# Display names. The bot's own location prefers [Weather] default_city +
# default_state; other coordinates are reverse-geocoded (state for US,
# country otherwise). Results cached.
self.default_city = self.bot.config.get("Weather", "default_city", fallback="").strip()
# US ZIP -> city via Zippopotam.us. Opt-out for anyone who'd rather not
# add the external lookup; falls back to reverse geocoding when disabled.
self.zip_city_lookup = self.get_config_value(
"Rain_Command", "zip_city_lookup", fallback=True, value_type="bool"
)
self._reverse_cache: dict[str, tuple[Optional[str], Optional[str]]] = {}
self._zip_cache: dict[str, str] = {}
def can_execute(self, message: MeshMessage, skip_channel_check: bool = False) -> bool:
if not self.rain_enabled:
return False
return super().can_execute(message)
def _create_retry_session(self) -> requests.Session:
"""Session with light retry/backoff for the Open-Meteo call."""
session = requests.Session()
retry_strategy = Retry(
total=2,
backoff_factor=0.3,
status_forcelist=[500, 502, 503, 504],
allowed_methods=["GET"],
raise_on_status=False,
)
adapter = HTTPAdapter(max_retries=retry_strategy, pool_connections=10, pool_maxsize=20)
session.mount("https://", adapter)
session.mount("http://", adapter)
return session
def _get_companion_location(self, message: MeshMessage) -> Optional[tuple[float, float]]:
"""Get companion/sender location from the contact-tracking database."""
try:
sender_pubkey = getattr(message, "sender_pubkey", None)
if not sender_pubkey:
return None
query = """
SELECT latitude, longitude
FROM complete_contact_tracking
WHERE public_key = ?
AND latitude IS NOT NULL AND longitude IS NOT NULL
AND latitude != 0 AND longitude != 0
ORDER BY COALESCE(last_advert_timestamp, last_heard) DESC
LIMIT 1
"""
results = self.bot.db_manager.execute_query(query, (sender_pubkey,))
if results:
row = results[0]
return (float(row["latitude"]), float(row["longitude"]))
return None
except Exception as e:
self.logger.debug(f"Error getting companion location: {e}")
return None
def _get_bot_location(self) -> Optional[tuple[float, float]]:
"""Get bot location from config ([Bot] bot_latitude, bot_longitude)."""
try:
lat = self.bot.config.getfloat("Bot", "bot_latitude", fallback=None)
lon = self.bot.config.getfloat("Bot", "bot_longitude", fallback=None)
if lat is not None and lon is not None and -90 <= lat <= 90 and -180 <= lon <= 180:
return (lat, lon)
return None
except Exception as e:
self.logger.debug(f"Error getting bot location: {e}")
return None
def _reverse_geocode(self, lat: float, lon: float) -> tuple[Optional[str], Optional[str]]:
"""Reverse-geocode to (city, suffix), cached. suffix is the US state
abbreviation ('TN') for US points, else the country name ('Colombia')."""
key = f"{lat:.3f},{lon:.3f}"
if key in self._reverse_cache:
return self._reverse_cache[key]
city, suffix = reverse_geocode_region(self.bot, lat, lon, timeout=self.url_timeout, logger=self.logger)
if city or suffix:
_cache_put(self._reverse_cache, key, (city, suffix))
return city, suffix
def _coordinates_to_location_string(self, lat: float, lon: float) -> Optional[str]:
"""'City, ST' (US) or 'City, Country' (non-US) from reverse geocoding."""
city, suffix = self._reverse_geocode(lat, lon)
if not city:
return None
return join_location(city, suffix)
def _suffix_for_coords(self, lat: float, lon: float) -> Optional[str]:
"""US state abbreviation or country name for coordinates (enriches a known city)."""
return self._reverse_geocode(lat, lon)[1]
def _zip_to_city_string(self, zipcode: str) -> Optional[str]:
"""US ZIP -> 'City, ST' via Zippopotam.us (free, no key, cached).
OSM/Nominatim often lacks the USPS city for a ZIP centroid (returns the
county instead), so for 5-digit US ZIPs this gives a far better name.
Returns None on failure (caller falls back to reverse geocoding).
"""
z = zipcode.strip()
if z in self._zip_cache:
return self._zip_cache[z]
name: Optional[str] = None
try:
resp = requests.get(f"https://api.zippopotam.us/us/{z}", timeout=self.url_timeout)
if resp.ok:
places = resp.json().get("places") or []
if places:
city = (places[0].get("place name") or "").strip()
st = (places[0].get("state abbreviation") or "").strip()
if city:
name = join_location(city, st)
except Exception as e:
self.logger.debug(f"Zippopotam ZIP lookup failed for {z}: {e}")
if name:
_cache_put(self._zip_cache, z, name)
return name
def _resolve_location(
self, message: MeshMessage, location: Optional[str]
) -> tuple[Optional[float], Optional[float], Optional[str], Optional[str]]:
"""Resolve to (lat, lon, location_label, error_key).
Mirrors the aurora command: no input falls back to companion location,
then a [Rain_Command] default, then the bot location. Coordinate-based
labels are reverse-geocoded to a city name for display.
"""
if not location or not location.strip():
co = self._get_companion_location(message)
if co:
label = self._coordinates_to_location_string(co[0], co[1]) or f"{co[0]:.1f},{co[1]:.1f}"
return (co[0], co[1], label, None)
default_lat = default_lon = None
if self.bot.config.has_section("Rain_Command"):
default_lat = self.bot.config.getfloat("Rain_Command", "default_lat", fallback=None)
default_lon = self.bot.config.getfloat("Rain_Command", "default_lon", fallback=None)
if default_lat is not None and default_lon is not None:
if -90 <= default_lat <= 90 and -180 <= default_lon <= 180:
label = self._coordinates_to_location_string(default_lat, default_lon) or f"{default_lat:.1f},{default_lon:.1f}"
return (default_lat, default_lon, label, None)
bot_loc = self._get_bot_location()
if bot_loc:
# Prefer the configured default city + state for the bot's own location.
if self.default_city:
suffix = self.default_state or self._suffix_for_coords(bot_loc[0], bot_loc[1])
label = join_location(self.default_city, suffix)
else:
label = self._coordinates_to_location_string(bot_loc[0], bot_loc[1]) or f"{bot_loc[0]:.1f},{bot_loc[1]:.1f}"
return (bot_loc[0], bot_loc[1], label, None)
return (None, None, None, "commands.rain.no_location")
loc = location.strip()
# Declared Optional up front: the coordinate branch assigns floats while
# the ZIP/city geocoders return Optional[float]; each is narrowed before use.
lat: Optional[float]
lon: Optional[float]
# Coordinates "lat,lon"
if re.match(r"^\s*-?\d+\.?\d*\s*,\s*-?\d+\.?\d*\s*$", loc):
try:
a, b = loc.split(",", 1)
lat, lon = float(a.strip()), float(b.strip())
if not (-90 <= lat <= 90) or not (-180 <= lon <= 180):
return (None, None, None, "commands.rain.error")
return (lat, lon, self._coordinates_to_location_string(lat, lon) or loc, None)
except ValueError:
return (None, None, None, "commands.rain.error")
# US ZIP (5 digits)
if re.match(r"^\s*\d{5}\s*$", loc):
lat, lon = geocode_zipcode_sync(
self.bot, loc, default_country=self.default_country, timeout=self.url_timeout
)
if lat is None or lon is None:
return (None, None, None, "commands.rain.no_location_zipcode")
# Name the ZIP "City, ST (zip)": Zippopotam first (reliable USPS city),
# then reverse geocoding, else just the ZIP.
zip_city = self._zip_to_city_string(loc) if self.zip_city_lookup else None
city = zip_city or self._coordinates_to_location_string(lat, lon)
label = f"{city} ({loc})" if city else loc
return (lat, lon, label, None)
# City
lat, lon, _ = geocode_city_sync(
self.bot,
loc,
default_state=self.default_state,
default_country=self.default_country,
include_address_info=False,
timeout=self.url_timeout,
)
if lat is None or lon is None:
return (None, None, None, "commands.rain.no_location_city")
# Keep the typed city name (more accurate than reverse geocoding for small
# towns), but append the state (US) or country (non-US) from the geocoder
# — stripping any region the user already typed so it isn't doubled.
suffix = self._suffix_for_coords(lat, lon)
typed_city = city_display_name(loc, suffix)
label = join_location(typed_city, suffix)
return (lat, lon, label, None)
def _fetch_series(self, lat: float, lon: float) -> Optional[dict]:
"""Fetch the precip series (own short-lived session).
Prefers the NWS gridpoint (US) so a "!rain"/"!snow" matches the proactive
push and reflects the forecaster-adjusted convective chances the Open-Meteo
model can miss; falls back to Open-Meteo for non-US locations (no NWS
coverage) or on any failure.
"""
session = self._create_retry_session()
try:
series = fetch_precip_series_nws(
session, lat, lon, timeout=self.url_timeout, logger=self.logger,
)
if series:
return series
return fetch_precip_series(
session, lat, lon,
weather_model=self.weather_model, timeout=self.url_timeout, logger=self.logger,
cache_ttl=self.cache_ttl,
)
finally:
session.close()
def _window_label(self) -> str:
"""Human window length, e.g. '2h' or '90min'."""
if self.window_minutes % 60 == 0:
return f"{self.window_minutes // 60}h"
return f"{self.window_minutes}min"
def _ptype(self, bucket: Optional[str]) -> str:
"""Translatable precip-type label for a bucket."""
b = bucket or "rain"
return self.translate(f"commands.rain.precip_types.{b}")
def _detail_suffix(self, result: NowcastResult, prob: Optional[int], temp_f: Optional[int]) -> str:
"""Trailing detail: ' (est 0.2 in, 70%) 34°F' — amount + probability in the
parens, plus a temperature tag only when borderline (~30-38°F)."""
parts: list[str] = []
if self.show_amount:
amt = format_amount_estimate(result.bucket, result.amount_mm, result.snow_cm, self.amount_unit)
if amt:
parts.append(f"est {amt}")
if self.show_probability and prob is not None:
parts.append(f"{prob}%")
paren = f" ({', '.join(parts)})" if parts else ""
temp = f" {temp_f}°F" if (self.show_temp and temp_f is not None and 30 <= temp_f <= 38) else ""
return paren + temp
def _format_result(
self, result: NowcastResult, location_label: str,
*, asked_word: Optional[str] = None, mismatch: bool = False,
prob: Optional[int] = None, temp_f: Optional[int] = None,
) -> str:
"""Render a NowcastResult into a single mesh-friendly line.
``asked_word`` is the precip the user asked for ("rain"/"snow"); when
``mismatch`` is set the result is the *other* type, rendered as
"No <asked>, but <actual> ..." so a !snow that finds rain still helps.
``prob``/``temp_f`` add a probability and borderline-temperature tag.
"""
emoji = _BUCKET_EMOJI.get(result.bucket or "rain", "🌧️")
if result.state == "dry_clear":
return self.translate(
"commands.rain.clear", precip=asked_word or "rain",
window=self._window_label(), location=location_label,
)
if mismatch and asked_word:
ptype = f"No {asked_word}, but {self._ptype(result.bucket).lower()}"
else:
ptype = self._ptype(result.bucket)
if result.state == "dry_incoming":
if result.open_ended or not result.duration_minutes:
extra = self.translate("commands.rain.duration_open")
else:
extra = self.translate("commands.rain.duration_for", duration=result.duration_minutes)
return self.translate(
"commands.rain.starting",
emoji=emoji, ptype=ptype, minutes=result.minutes,
location=location_label, extra=extra,
) + self._detail_suffix(result, prob, temp_f)
if result.state == "raining_stopping":
return self.translate(
"commands.rain.stopping",
emoji=emoji, ptype=ptype, minutes=result.minutes, location=location_label,
) + self._detail_suffix(result, prob, temp_f)
# raining_continuing
return self.translate(
"commands.rain.continuing",
emoji=emoji, ptype=ptype, window=self._window_label(), location=location_label,
) + self._detail_suffix(result, prob, temp_f)
def _format_changeover(self, rain_r: NowcastResult, snow_r: NowcastResult, location_label: str) -> str:
"""A rain<->snow transition line, e.g. '🌧️→🌨️ Rain now → snow in ~60min
for X' — whichever type comes first leads."""
def start(r: NowcastResult) -> int:
return 0 if r.state in ("raining_stopping", "raining_continuing") else (r.minutes or 0)
first_r, second_r = (rain_r, snow_r) if start(rain_r) <= start(snow_r) else (snow_r, rain_r)
when = self.translate("commands.rain.now") if start(first_r) == 0 else f"in ~{start(first_r)}min"
return self.translate(
"commands.rain.changeover",
from_emoji=_BUCKET_EMOJI.get(first_r.bucket or "rain", "🌧️"),
to_emoji=_BUCKET_EMOJI.get(second_r.bucket or "snow", "🌨️"),
first=self._ptype(first_r.bucket), when=when,
second=self._ptype(second_r.bucket).lower(),
minutes=start(second_r), location=location_label,
)
def get_help_text(self, message: Any = None) -> str:
"""Help tailored to the keyword asked about: 'help snow' talks snow
(depth), 'help rain'/'help nowcast' talk rain (amount). Falls back to
the rain variant when the queried word can't be determined."""
word = "rain"
content = (getattr(message, "content", "") or "").strip()
if content.startswith("!"):
content = content[1:].strip()
parts = content.split()
if len(parts) >= 2:
word = parts[1].lower()
key = "commands.rain.help_snow" if word == "snow" else "commands.rain.help_rain"
return self.translate(key)
async def execute(self, message: MeshMessage) -> bool:
content = message.content.strip()
if content.startswith("!"):
content = content[1:].strip()
parts = content.split()
location: Optional[str] = " ".join(parts[1:]).strip() if len(parts) >= 2 else None
# Which keyword triggered us sets the precip we're asked about. !snow leads
# with snow, !rain with rain; !nowcast (or anything else) has no preference.
mode = parts[0].lower() if parts else "rain"
asked_word: Optional[str] = "snow" if mode == "snow" else (None if mode == "nowcast" else "rain")
# Bare country/US state (e.g. "france", "texas") -> default to its capital
# and append a heads-up, since one centroid point isn't representative.
region_note: Optional[str] = None
cap_query = region_capital_query(location)
if cap_query:
location = cap_query
region_note = REGION_DEFAULT_NOTE
lat, lon, location_label, err_key = self._resolve_location(message, location)
if lat is None or lon is None:
region = self.default_state or self.default_country
if err_key == "commands.rain.no_location":
await self.send_response(message, self.translate("commands.rain.no_location"))
elif err_key == "commands.rain.no_location_zipcode":
await self.send_response(
message, self.translate("commands.rain.no_location_zipcode", location=location or "")
)
elif err_key == "commands.rain.no_location_city":
await self.send_response(
message,
self.translate("commands.rain.no_location_city", location=location or "", state=region),
)
else:
await self.send_response(
message, self.translate("commands.rain.error", error="Invalid location or coordinates")
)
return True
try:
self.record_execution(message.sender_id)
loop = asyncio.get_event_loop()
series = await loop.run_in_executor(None, lambda: self._fetch_series(lat, lon))
except Exception as e:
self.logger.error(f"Error fetching rain nowcast: {e}")
await self.send_response(message, self.translate("commands.rain.error_fetching"))
return True
if not series:
await self.send_response(message, self.translate("commands.rain.error_fetching"))
return True
def run(fam: Optional[frozenset[str]]) -> Optional[NowcastResult]:
return analyze_precip_nowcast(
series["times"], series["precip"], series["codes"], series["now"],
window_minutes=self.window_minutes, threshold=self.threshold_mm,
current_precip=series.get("current_precip"), current_code=series.get("current_code"),
snow=series.get("snow"), family=fam,
)
# Analyze each precip family. Both present -> a changeover line; otherwise
# the asked-for type, falling back to the other with a "No <asked>, but …".
rain_r = run(RAIN_FAMILY)
snow_r = run(SNOW_FAMILY)
if rain_r is None or snow_r is None:
await self.send_response(message, self.translate("commands.rain.error_fetching"))
return True
rain_ok = rain_r.state != "dry_clear"
snow_ok = snow_r.state != "dry_clear"
label = location_label or f"{lat:.1f},{lon:.1f}"
if rain_ok and snow_ok:
response = self._format_changeover(rain_r, snow_r, label)
else:
if asked_word == "snow":
result, mismatch = (snow_r, False) if snow_ok else ((rain_r, True) if rain_ok else (snow_r, False))
elif asked_word == "rain":
result, mismatch = (rain_r, False) if rain_ok else ((snow_r, True) if snow_ok else (rain_r, False))
else: # nowcast: no type preference
result, mismatch = (rain_r if rain_ok else snow_r), False
prob, temp_f = episode_probability_temp(series, result)
response = self._format_result(
result, label, asked_word=asked_word, mismatch=mismatch, prob=prob, temp_f=temp_f,
)
if region_note:
response = f"{response} {region_note}"
max_len = self.get_max_message_length(message)
if len(response) > max_len:
response = response[: max_len - 3] + "..."
await self.send_response(message, response)
return True