Files
meshcore-bot/modules/commands/rain_command.py
T
rlwilliamson-dev ffa5595fff feat(rain): nowcast enhancements (amounts, capitals, snow/ice, !snow, probability, changeover) + tests
Builds on the merged rain/snow nowcast (#193): ten enhancements plus
end-to-end, proactive, and live-smoke test coverage. All new behavior is
config-gated or additive, so existing deployments are unaffected by default.

Enhancements
- Precip amount estimate "(est 0.2 in)" on the command and the proactive push;
  snow shown as real depth (Open-Meteo snowfall, cm); freezing rain tagged "in ice".
- Bare country / US state resolves to its capital with a heads-up
  (self-contained modules/region_capitals.py; no pycountry/us dependency).
- join_location() dedupes "Spain, Spain" / city-states.
- !snow alias + neutral !nowcast; each looks for its own precip family across
  the window, else falls back with a "No snow, but rain ..." cross-type line.
- Keyword-aware help (help rain / help snow).
- Precip probability shown "(..., 70%)"; the proactive incoming alert is gated
  to >= [Weather_Service] rain_nowcast_min_probability (default 50).
- Rain<->snow changeover line when the window holds both families.
- Borderline temperature tag (30-38F).
- Short-lived series cache shared by the command and the proactive poll.

Tests
- test_rain_command_e2e.py: drives RainCommand.execute() end to end and asserts
  the exact rendered reply across dry/incoming/raining, snow depth, cross-type
  mismatch, changeover, ice, temp tag, region capitals, config toggles, DM
  budget, and keyword-aware help.
- test_rain_proactive_e2e.py: drives Weather_Service._check_rain_nowcast() for
  the probability gate, snow depth, ending notice, and once-per-episode dedup.
- test_rain_live_smoke.py: opt-in (RAIN_LIVE_SMOKE=1) live Open-Meteo check for
  upstream schema drift; skipped in CI.
- Shared scaffolding in tests/unit/_rain_harness.py.

New config: [Rain_Command] show_amount/amount_unit/show_probability/show_temp/
cache_seconds/zip_city_lookup; [Weather_Service] rain_nowcast_min_probability/
rain_nowcast_cache_seconds/rain_nowcast_show_amount/rain_nowcast_amount_unit.
ruff + mypy clean.
2026-06-06 16:18:52 -05:00

1096 lines
48 KiB
Python

#!/usr/bin/env python3
"""
Rain nowcast command - minute-level "rain starting/stopping in ~N min" using
Open-Meteo's 15-minutely precipitation forecast. Works worldwide, no API key.
"""
import asyncio
import re
import time
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any, Optional
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from ..models import MeshMessage
from ..region_capitals import REGION_DEFAULT_NOTE, region_capital_query
from ..utils import geocode_city_sync, geocode_zipcode_sync, normalize_us_state
from .base_command import BaseCommand
# WMO weather code -> precipitation "bucket". Buckets map to an emoji and a
# translatable label (commands.rain.precip_types.<bucket>). Codes not listed
# here are non-precipitating and never trigger a nowcast.
_PRECIP_BUCKETS: dict[int, str] = {
51: "drizzle", 53: "drizzle", 55: "drizzle",
56: "freezing", 57: "freezing",
61: "rain", 63: "rain", 65: "heavy_rain",
66: "freezing", 67: "freezing",
71: "snow", 73: "snow", 75: "snow", 77: "snow",
80: "showers", 81: "showers", 82: "heavy_rain",
85: "snow", 86: "snow",
95: "thunder", 96: "thunder", 99: "thunder",
}
# Emoji per bucket (leads the response line).
_BUCKET_EMOJI: dict[str, str] = {
"drizzle": "🌦️",
"rain": "🌧️",
"heavy_rain": "🌧️",
"freezing": "🧊",
"snow": "🌨️",
"showers": "🌦️",
"thunder": "⛈️",
}
# English fallbacks for precip type labels (translations override via
# commands.rain.precip_types.<bucket>; missing keys fall back to en.json).
_BUCKET_LABEL_EN: dict[str, str] = {
"drizzle": "Drizzle",
"rain": "Rain",
"heavy_rain": "Heavy rain",
"freezing": "Freezing rain",
"snow": "Snow",
"showers": "Showers",
"thunder": "Thunderstorms",
}
# Precip "families" for the !rain vs !snow modes. A command looks for its own
# family across the window first; only if none is coming does it fall back to
# the other type with a "No <type>, but ..." heads-up. Freezing rain is liquid,
# so it lives in the rain family (a !snow ice-event reads "No snow, but ...").
RAIN_FAMILY = frozenset({"drizzle", "rain", "heavy_rain", "showers", "thunder", "freezing"})
SNOW_FAMILY = frozenset({"snow"})
# Upper bound on the per-instance geocoding caches so a long-running bot that's
# queried for many distinct locations can't grow them without limit.
_GEOCODE_CACHE_CAP = 256
def _cache_put(cache: dict, key: Any, value: Any) -> None:
"""Insert into a size-capped cache, evicting the oldest entry when full.
Relies on dicts preserving insertion order (Python 3.7+).
"""
if key not in cache and len(cache) >= _GEOCODE_CACHE_CAP:
cache.pop(next(iter(cache)))
cache[key] = value
def precip_bucket_for_code(code: Optional[int]) -> Optional[str]:
"""Map a WMO weather code to a precipitation bucket, or None if not precip."""
if code is None:
return None
try:
return _PRECIP_BUCKETS.get(int(code))
except (TypeError, ValueError):
return None
def titlecase_location(text: str) -> str:
"""Tidy a user-typed location for display.
'middlesboro, ky' -> 'Middlesboro, KY'; 'paris, france' -> 'Paris, France';
'memphis' -> 'Memphis'. A 2-letter token after a comma is treated as a
state/country code and upper-cased; everything else is title-cased.
"""
parts = [p.strip() for p in text.split(",") if p.strip()]
if not parts:
return text.strip()
out = []
for i, p in enumerate(parts):
if i > 0 and len(p) == 2 and p.isalpha():
out.append(p.upper())
else:
out.append(p.title())
return ", ".join(out)
# US state / territory 2-letter codes — used to drop a trailing state from a
# typed location like "london ky" (no comma) so it doesn't become "London Ky".
US_STATE_ABBRS = frozenset({
"AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DE", "FL", "GA", "HI", "ID", "IL",
"IN", "IA", "KS", "KY", "LA", "ME", "MD", "MA", "MI", "MN", "MS", "MO", "MT",
"NE", "NV", "NH", "NJ", "NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI",
"SC", "SD", "TN", "TX", "UT", "VT", "VA", "WA", "WV", "WI", "WY",
"DC", "AS", "GU", "MP", "PR", "VI",
})
def city_display_name(typed_location: str, suffix: Optional[str] = None) -> str:
"""City part of a typed location for display, dropping a trailing region the
user appended without a comma.
'london ky' -> 'London'; 'paris france' -> 'Paris'; 'london, ky' -> 'London';
'oklahoma city' -> 'Oklahoma City'. `suffix` is the geocoder's authoritative
state/country (e.g. 'KY' or 'France'); when the typed text ends with it, it's
stripped so it isn't doubled into the city name. The state/country is added
back separately by the caller.
"""
head = typed_location.split(",")[0].strip()
# Drop a trailing region matching the geocoder's suffix — handles country
# names and multi-word regions ("paris france", "london united kingdom").
if suffix and head.lower().endswith(" " + suffix.lower()):
head = head[: -len(suffix)].strip()
# Drop a trailing US state abbreviation ("london ky" -> "london").
tokens = head.split()
if len(tokens) >= 2 and tokens[-1].upper() in US_STATE_ABBRS:
head = " ".join(tokens[:-1])
return titlecase_location(head)
def join_location(city: Optional[str], suffix: Optional[str]) -> str:
"""Join a city and its state/country suffix as 'City, Suffix'.
Collapses to a single name when one side is missing or the two name the same
place (case-insensitive) — so a country typed as the city ('spain' -> 'Spain',
not 'Spain, Spain') or a city-state ('Singapore', not 'Singapore, Singapore')
renders once.
"""
city = (city or "").strip()
suffix = (suffix or "").strip()
if not suffix:
return city
if not city or city.lower() == suffix.lower():
return suffix
return f"{city}, {suffix}"
def reverse_geocode_region(
bot: Any, lat: float, lon: float, *, timeout: int = 10, logger: Any = None
) -> tuple[Optional[str], Optional[str]]:
"""Reverse-geocode to (city, suffix), respecting the bot's Nominatim rate limiter.
suffix is the US state abbreviation ('TN') for US points, else the English
country name ('Japan'). Requests language='en' so country names aren't
localized. No caching (callers cache as needed). Shared by the rain command
and the Weather_Service proactive push so both label locations identically.
"""
city: Optional[str] = None
suffix: Optional[str] = None
try:
from ..utils import get_nominatim_geocoder
limiter = getattr(bot, "nominatim_rate_limiter", None)
if limiter is not None:
limiter.wait_for_request_sync()
geolocator = get_nominatim_geocoder(timeout=timeout)
# language="en" so country names come back in English ("Japan", not "日本").
result = geolocator.reverse(f"{lat}, {lon}", timeout=timeout, language="en")
if limiter is not None:
limiter.record_request()
if result is not None and hasattr(result, "raw"):
address = result.raw.get("address", {})
city = (
address.get("city")
or address.get("town")
or address.get("village")
or address.get("municipality")
or address.get("county")
or None
)
country_code = (address.get("country_code") or "").lower()
if country_code == "us":
iso = address.get("ISO3166-2-lvl4") or address.get("ISO3166-2-lvl6") or ""
if "-" in iso:
suffix = iso.rsplit("-", 1)[-1]
else:
state_abbr, _ = normalize_us_state(address.get("state", ""))
suffix = state_abbr or address.get("state") or None
else:
suffix = address.get("country") or None
except Exception as e:
if logger:
logger.debug(f"Error reverse geocoding {lat},{lon}: {e}")
return city, suffix
def precip_descriptor(bucket: Optional[str]) -> tuple[str, str]:
"""Return (emoji, English label) for a precip bucket; defaults to rain.
The label is English; localized callers use the commands.rain.precip_types
translation keys instead. Shared so the Weather_Service can build proactive
nowcast messages without duplicating the bucket tables.
"""
b = bucket or "rain"
return _BUCKET_EMOJI.get(b, "🌧️"), _BUCKET_LABEL_EN.get(b, "Rain")
def format_precip_amount(mm: Optional[float], unit: str = "in") -> Optional[str]:
"""Format an accumulated precip total (mm) for display, or None if negligible.
``unit`` "in" renders inches (the US default), anything else millimetres.
Returns None for a non-positive total so callers can omit the estimate
entirely. Inches trim trailing zeros: 0.20 -> "0.2 in", 0.05 -> "0.05 in".
"""
if mm is None or mm <= 0:
return None
if unit == "mm":
return f"{mm:.1f} mm" if mm >= 0.1 else "<0.1 mm"
inches = mm * 0.0393701
if inches < 0.01:
return "<0.01 in"
return f"{inches:.2f}".rstrip("0").rstrip(".") + " in"
def format_snow_amount(cm: Optional[float], unit: str = "in") -> Optional[str]:
"""Format a snowfall total (cm of actual snow) for display, or None if negligible.
Snow is reported as depth, not liquid equivalent (Open-Meteo's ``precipitation``
is the melted equivalent, ~7x less). ``unit`` "in" renders inches of snow (US
default), anything else centimetres. The trailing " snow" keeps it distinct
from the liquid rain estimate; 0.1 precision suits snow's coarseness.
"""
if cm is None or cm <= 0:
return None
if unit == "mm":
return f"{cm:.1f} cm snow" if cm >= 0.1 else "<0.1 cm snow"
inches = cm * 0.393701
if inches < 0.1:
return "<0.1 in snow"
return f"{inches:.1f}".rstrip("0").rstrip(".") + " in snow"
def format_amount_estimate(
bucket: Optional[str], amount_mm: Optional[float], snow_cm: Optional[float], unit: str = "in"
) -> Optional[str]:
"""The estimate string for a precip bucket, or None if negligible.
Picks the right quantity per type: snow shows depth ("3 in snow"); freezing
rain shows its liquid/glaze amount tagged "ice" ("0.1 in ice", ~1:1 with
accretion); everything else is plain liquid ("0.2 in").
"""
if bucket == "snow":
return format_snow_amount(snow_cm, unit)
amt = format_precip_amount(amount_mm, unit)
if amt and bucket == "freezing":
return f"{amt} ice"
return amt
def episode_probability_temp(series: dict, result: "NowcastResult") -> tuple[Optional[int], Optional[int]]:
"""Precip probability (%) and 2 m temperature (°F) at the episode's defining
moment — the current bucket when precipitating now, else the start bucket.
Returns (None, None) for dry_clear or when the data is missing.
"""
if result is None or result.state == "dry_clear":
return None, None
times = series.get("times") or []
probs = series.get("prob") or []
temps = series.get("temp") or []
try:
now = datetime.fromisoformat(series["now"])
except (TypeError, ValueError, KeyError):
return None, None
if result.state in ("raining_stopping", "raining_continuing"):
target = now
else: # dry_incoming: align with when precip begins
target = now + timedelta(minutes=result.minutes or 0)
best_i: Optional[int] = None
best_d: Optional[float] = None
for i, t in enumerate(times):
try:
tt = datetime.fromisoformat(t)
except (TypeError, ValueError):
continue
d = abs((tt - target).total_seconds())
if best_d is None or d < best_d:
best_d, best_i = d, i
if best_i is None:
return None, None
prob = probs[best_i] if best_i < len(probs) else None
tc = temps[best_i] if best_i < len(temps) else None
prob_pct = int(round(prob)) if prob is not None else None
temp_f = int(round(tc * 9 / 5 + 32)) if tc is not None else None
return prob_pct, temp_f
# Short-lived cache of fetched series, keyed by rounded coords + model, so the
# command and the proactive poll can reuse one fetch instead of re-hitting
# Open-Meteo. Bounded; entries expire by the caller's cache_ttl.
_SERIES_CACHE: dict[tuple, tuple[float, dict]] = {}
_SERIES_CACHE_CAP = 64
def fetch_precip_series(
session: Any,
lat: float,
lon: float,
*,
weather_model: str = "",
timeout: int = 10,
logger: Any = None,
cache_ttl: float = 0.0,
) -> Optional[dict]:
"""Fetch + normalize an Open-Meteo precipitation series using `session`.
Prefers 15-minutely data; falls back to hourly when a model doesn't provide
minutely_15. The caller owns the session's lifecycle. Returns a dict with keys
times, precip, snow, prob, temp, codes, now, current_precip, current_code,
step — or None on any error. Precipitation is requested in mm (detection is
unit-independent); snowfall in cm; prob is precip probability (%); temp is
2 m temperature (°C). When cache_ttl > 0, a fresh prior result for the same
rounded location is reused.
"""
cache_key = (round(lat, 2), round(lon, 2), weather_model)
if cache_ttl > 0:
hit = _SERIES_CACHE.get(cache_key)
if hit is not None and (time.time() - hit[0]) < cache_ttl:
return hit[1]
api_url = "https://api.open-meteo.com/v1/forecast"
variables = "precipitation,snowfall,weather_code,precipitation_probability,temperature_2m"
params: dict[str, Any] = {
"latitude": lat,
"longitude": lon,
"minutely_15": variables,
"hourly": variables,
"current": "precipitation,snowfall,weather_code,temperature_2m",
"precipitation_unit": "mm",
"timezone": "auto",
"forecast_days": 2, # cover the window even when "now" is late in the day
}
if weather_model:
params["models"] = weather_model
try:
response = session.get(api_url, params=params, timeout=timeout)
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
if logger:
logger.debug(f"Open-Meteo nowcast timeout/connection error: {e}")
return None
if not response.ok:
if logger:
logger.warning(f"Open-Meteo nowcast error: HTTP {response.status_code}")
return None
data = response.json()
current = data.get("current", {}) or {}
now = current.get("time")
if not now:
return None
common = {
"now": now,
"current_precip": current.get("precipitation"),
"current_code": current.get("weather_code"),
}
series: Optional[dict] = None
m15 = data.get("minutely_15", {}) or {}
m_times = m15.get("time") or []
m_precip = m15.get("precipitation") or []
if m_times and any(p is not None for p in m_precip):
series = {
"times": m_times, "precip": m_precip,
"snow": m15.get("snowfall") or [],
"prob": m15.get("precipitation_probability") or [],
"temp": m15.get("temperature_2m") or [],
"codes": m15.get("weather_code") or [], "step": 15, **common,
}
else:
hourly = data.get("hourly", {}) or {}
h_times = hourly.get("time") or []
if not h_times:
return None
series = {
"times": h_times, "precip": hourly.get("precipitation") or [],
"snow": hourly.get("snowfall") or [],
"prob": hourly.get("precipitation_probability") or [],
"temp": hourly.get("temperature_2m") or [],
"codes": hourly.get("weather_code") or [], "step": 60, **common,
}
if cache_ttl > 0:
if len(_SERIES_CACHE) >= _SERIES_CACHE_CAP:
_SERIES_CACHE.pop(next(iter(_SERIES_CACHE)))
_SERIES_CACHE[cache_key] = (time.time(), series)
return series
@dataclass
class NowcastResult:
"""Outcome of a precipitation nowcast analysis.
state is one of:
- "dry_clear": dry now, no precip within the window
- "dry_incoming": dry now, precip starts in `minutes`
- "raining_stopping": precip now, drops below threshold in `minutes`
- "raining_continuing": precip now, never clears within the window
"""
state: str
minutes: Optional[int] = None # until start (dry_incoming) or stop (raining_stopping)
duration_minutes: Optional[int] = None # for dry_incoming: how long the precip lasts
open_ended: bool = False # precip extends past the analysis window
bucket: Optional[str] = None # precip bucket (drizzle/rain/snow/...) when raining/incoming
amount_mm: Optional[float] = None # estimated liquid precip total (mm) over the episode within the window
snow_cm: Optional[float] = None # estimated snowfall total (cm) over the episode (snow depth, not liquid)
def _round5(minutes: float) -> int:
"""Round to the nearest 5 minutes, with a floor of 5 for positive values."""
r = int(round(minutes / 5.0) * 5)
if minutes > 0 and r < 5:
return 5
return max(0, r)
def analyze_precip_nowcast(
times: list[str],
precip: list[Optional[float]],
codes: list[Optional[int]],
now_iso: str,
*,
window_minutes: int = 120,
threshold: float = 0.1,
current_precip: Optional[float] = None,
current_code: Optional[int] = None,
snow: Optional[list[Optional[float]]] = None,
family: Optional[frozenset[str]] = None,
) -> Optional[NowcastResult]:
"""Pure nowcast analysis over a precipitation time series.
All times are naive ISO-8601 local strings (e.g. "2026-06-03T14:15") in the
same timezone as `now_iso`, so "now" is derived from the API rather than the
host clock. Returns None when the series is too sparse to analyze.
Args:
times: Bucket start times (ascending), ISO local strings.
precip: Precipitation amount per bucket, in mm. None is treated as 0.
codes: WMO weather code per bucket (parallel to `times`); used for the
precip type only.
now_iso: Current time, ISO local string (from the API's current.time).
window_minutes: How far ahead to look.
threshold: mm-per-bucket at/above which a bucket counts as precipitating.
current_precip: Optional instantaneous precip (API current.precipitation);
preferred over the bucket value for the "raining right now" decision.
current_code: Optional current WMO code, used for the precip type when
raining now.
snow: Optional snowfall per bucket, in cm (parallel to `times`). When a
snow episode is reported, snow_cm gives the depth estimate (snowfall
is the actual accumulation; precip is only its liquid equivalent).
family: Optional set of bucket names (e.g. {"snow"}); a bucket then counts
as precipitating only if its code maps into the family. None (default)
counts any precip — used to answer "is *snow* coming?" vs "rain?".
"""
if not times or not precip:
return None
def counts(amt: float, code: Optional[int]) -> bool:
"""Whether a bucket is precipitating for this query (>= threshold, and in
the requested precip family when one is given)."""
if amt < threshold:
return False
return family is None or precip_bucket_for_code(code) in family
n = min(len(times), len(precip))
try:
now = datetime.fromisoformat(now_iso)
except (TypeError, ValueError):
return None
parsed: list[tuple[datetime, float, Optional[int], float]] = []
for i in range(n):
try:
t = datetime.fromisoformat(times[i])
except (TypeError, ValueError):
continue
amt = precip[i]
amt = 0.0 if amt is None else float(amt)
code = codes[i] if i < len(codes) else None
sf = snow[i] if (snow is not None and i < len(snow)) else None
sf = 0.0 if sf is None else float(sf)
parsed.append((t, amt, code, sf))
if not parsed:
return None
parsed.sort(key=lambda x: x[0])
# Index of the bucket containing "now" (largest start time <= now).
cur_idx = -1
for i, (t, _amt, _c, _sf) in enumerate(parsed):
if t <= now:
cur_idx = i
else:
break
# Upcoming buckets strictly after now, within the window.
upcoming: list[tuple[float, float, Optional[int], float]] = [] # (mins, precip_mm, code, snow_cm)
for t, amt, code, sf in parsed[cur_idx + 1:]:
mins = (t - now).total_seconds() / 60.0
if mins <= 0:
continue
if mins > window_minutes:
break
upcoming.append((mins, amt, code, sf))
# "Precipitating now?" (of the requested family) — prefer the API's
# instantaneous value + current code, else the current bucket.
now_code = current_code if current_code is not None else (parsed[cur_idx][2] if cur_idx >= 0 else None)
if current_precip is not None:
raining_now = counts(float(current_precip), now_code)
elif cur_idx >= 0:
raining_now = counts(parsed[cur_idx][1], parsed[cur_idx][2])
else:
raining_now = False
if raining_now:
bucket = precip_bucket_for_code(now_code) or "rain"
# Accumulate the episode totals: liquid (mm) and snowfall (cm), the current
# bucket plus each upcoming bucket until precip drops below threshold.
total = max(0.0, parsed[cur_idx][1]) if cur_idx >= 0 else 0.0
total_snow = max(0.0, parsed[cur_idx][3]) if cur_idx >= 0 else 0.0
for mins, amt, code, sf in upcoming:
if not counts(amt, code):
return NowcastResult(
state="raining_stopping", minutes=_round5(mins), bucket=bucket,
amount_mm=total, snow_cm=total_snow,
)
total += amt
total_snow += sf
return NowcastResult(
state="raining_continuing", open_ended=True, bucket=bucket,
amount_mm=total, snow_cm=total_snow,
)
# Dry now (of the requested family): find the first upcoming precip bucket.
for idx, (mins, amt, code, sf) in enumerate(upcoming):
if counts(amt, code):
bucket = precip_bucket_for_code(code) or "rain"
# How long does it last, and how much falls? Walk until it drops below
# threshold, summing liquid (mm) and snowfall (cm) for the estimate.
total = amt
total_snow = sf
end_mins: Optional[float] = None
for mins2, amt2, code2, sf2 in upcoming[idx + 1:]:
if not counts(amt2, code2):
end_mins = mins2
break
total += amt2
total_snow += sf2
if end_mins is None:
return NowcastResult(
state="dry_incoming", minutes=_round5(mins), open_ended=True,
bucket=bucket, amount_mm=total, snow_cm=total_snow,
)
return NowcastResult(
state="dry_incoming",
minutes=_round5(mins),
duration_minutes=_round5(end_mins - mins),
bucket=bucket,
amount_mm=total,
snow_cm=total_snow,
)
return NowcastResult(state="dry_clear")
def decide_rain_notification(
state: str,
minutes: Optional[int],
*,
lead_minutes: int,
start_announced: bool,
end_announced: bool,
seconds_since_last_start: Optional[float] = None,
seconds_since_last_end: Optional[float] = None,
renotify_minutes: int,
announce_ending: bool = True,
) -> tuple[Optional[str], bool, bool]:
"""Decide whether to push a proactive rain notice, and which kind.
A small state machine that keeps the Weather_Service from spamming a channel
every poll. Returns ``(kind, start_announced, end_announced)`` where ``kind``
is ``None``, ``"starting"`` (rain about to begin), or ``"ending"`` (rain about
to stop); the two booleans are the caller's updated per-episode flags.
- ``dry_clear`` ends the episode and re-arms both notices.
- ``dry_incoming`` fires ``"starting"`` once when precip enters the
``lead_minutes`` window.
- ``raining_stopping`` fires ``"ending"`` once when the clear-up enters the
window (unless ``announce_ending`` is False).
- Each notice fires at most once per episode; a ``renotify_minutes`` cooldown
(tracked separately per kind) absorbs forecast flapping.
"""
if state == "dry_clear":
return (None, False, False)
if state == "dry_incoming":
if minutes is None or minutes > lead_minutes:
return (None, start_announced, end_announced) # coming, but not yet within lead
if start_announced:
return (None, True, end_announced)
if seconds_since_last_start is not None and seconds_since_last_start < renotify_minutes * 60:
return (None, start_announced, end_announced) # cooldown: hold off, stay re-armed
return ("starting", True, end_announced)
# Raining now: the "starting" moment has passed (or was missed) — mark it so a
# late "starting" never fires.
if state == "raining_continuing":
return (None, True, end_announced)
if state == "raining_stopping":
if not announce_ending or minutes is None or minutes > lead_minutes:
return (None, True, end_announced)
if end_announced:
return (None, True, True)
if seconds_since_last_end is not None and seconds_since_last_end < renotify_minutes * 60:
return (None, True, end_announced)
return ("ending", True, True)
return (None, start_announced, end_announced)
class RainCommand(BaseCommand):
"""Minute-level rain nowcast for a location (Open-Meteo 15-minutely precip)."""
name = "rain"
keywords = ["rain", "nowcast", "snow"]
description = "Rain/snow nowcast: when precip starts or stops in the next ~2h, with amount"
category = "weather"
requires_internet = True
cooldown_seconds = 5
short_description = "Rain/snow nowcast (when precip starts/stops) for a location"
usage = "rain|snow [city|zipcode|lat,lon]"
examples = ["rain", "snow", "rain seattle", "snow 98101", "rain 47.6,-122.3"]
parameters = [
{"name": "location", "description": "Optional: city, US ZIP, or lat,lon. Default: companion or bot location."}
]
def __init__(self, bot: Any) -> None:
super().__init__(bot)
self.rain_enabled = self.get_config_value("Rain_Command", "enabled", fallback=True, value_type="bool")
self.default_state = self.bot.config.get("Weather", "default_state", fallback="")
self.default_country = self.bot.config.get("Weather", "default_country", fallback="US")
self.weather_model = self.bot.config.get("Weather", "weather_model", fallback="").strip()
self.url_timeout = 10
self.window_minutes = self.get_config_value(
"Rain_Command", "window_minutes", fallback=120, value_type="int"
)
# mm-per-15min at/above which a bucket counts as precipitating.
self.threshold_mm = self.get_config_value(
"Rain_Command", "precip_threshold_mm", fallback=0.1, value_type="float"
)
# Optional precip-amount estimate appended to the nowcast line, e.g.
# "(est 0.2 in)". Unit "in" (US default) or "mm"; show_amount toggles it.
self.show_amount = self.get_config_value(
"Rain_Command", "show_amount", fallback=True, value_type="bool"
)
self.amount_unit = self.bot.config.get(
"Rain_Command", "amount_unit", fallback="in"
).strip().lower()
# Show precip probability "(…, 70%)" and a borderline-temperature tag
# "34°F" (only when ~30-38°F, where rain/snow/ice is in doubt).
self.show_probability = self.get_config_value(
"Rain_Command", "show_probability", fallback=True, value_type="bool"
)
self.show_temp = self.get_config_value(
"Rain_Command", "show_temp", fallback=True, value_type="bool"
)
# Reuse a fetched series for this many seconds (shared with the proactive
# poll); 0 disables. Short so the nowcast's "now" stays fresh.
self.cache_ttl = self.get_config_value(
"Rain_Command", "cache_seconds", fallback=300, value_type="int"
)
# Display names. The bot's own location prefers [Weather] default_city +
# default_state; other coordinates are reverse-geocoded (state for US,
# country otherwise). Results cached.
self.default_city = self.bot.config.get("Weather", "default_city", fallback="").strip()
# US ZIP -> city via Zippopotam.us. Opt-out for anyone who'd rather not
# add the external lookup; falls back to reverse geocoding when disabled.
self.zip_city_lookup = self.get_config_value(
"Rain_Command", "zip_city_lookup", fallback=True, value_type="bool"
)
self._reverse_cache: dict[str, tuple[Optional[str], Optional[str]]] = {}
self._zip_cache: dict[str, str] = {}
def can_execute(self, message: MeshMessage, skip_channel_check: bool = False) -> bool:
if not self.rain_enabled:
return False
return super().can_execute(message)
def _create_retry_session(self) -> requests.Session:
"""Session with light retry/backoff for the Open-Meteo call."""
session = requests.Session()
retry_strategy = Retry(
total=2,
backoff_factor=0.3,
status_forcelist=[500, 502, 503, 504],
allowed_methods=["GET"],
raise_on_status=False,
)
adapter = HTTPAdapter(max_retries=retry_strategy, pool_connections=10, pool_maxsize=20)
session.mount("https://", adapter)
session.mount("http://", adapter)
return session
def _get_companion_location(self, message: MeshMessage) -> Optional[tuple[float, float]]:
"""Get companion/sender location from the contact-tracking database."""
try:
sender_pubkey = getattr(message, "sender_pubkey", None)
if not sender_pubkey:
return None
query = """
SELECT latitude, longitude
FROM complete_contact_tracking
WHERE public_key = ?
AND latitude IS NOT NULL AND longitude IS NOT NULL
AND latitude != 0 AND longitude != 0
ORDER BY COALESCE(last_advert_timestamp, last_heard) DESC
LIMIT 1
"""
results = self.bot.db_manager.execute_query(query, (sender_pubkey,))
if results:
row = results[0]
return (float(row["latitude"]), float(row["longitude"]))
return None
except Exception as e:
self.logger.debug(f"Error getting companion location: {e}")
return None
def _get_bot_location(self) -> Optional[tuple[float, float]]:
"""Get bot location from config ([Bot] bot_latitude, bot_longitude)."""
try:
lat = self.bot.config.getfloat("Bot", "bot_latitude", fallback=None)
lon = self.bot.config.getfloat("Bot", "bot_longitude", fallback=None)
if lat is not None and lon is not None and -90 <= lat <= 90 and -180 <= lon <= 180:
return (lat, lon)
return None
except Exception as e:
self.logger.debug(f"Error getting bot location: {e}")
return None
def _reverse_geocode(self, lat: float, lon: float) -> tuple[Optional[str], Optional[str]]:
"""Reverse-geocode to (city, suffix), cached. suffix is the US state
abbreviation ('TN') for US points, else the country name ('Colombia')."""
key = f"{lat:.3f},{lon:.3f}"
if key in self._reverse_cache:
return self._reverse_cache[key]
city, suffix = reverse_geocode_region(self.bot, lat, lon, timeout=self.url_timeout, logger=self.logger)
if city or suffix:
_cache_put(self._reverse_cache, key, (city, suffix))
return city, suffix
def _coordinates_to_location_string(self, lat: float, lon: float) -> Optional[str]:
"""'City, ST' (US) or 'City, Country' (non-US) from reverse geocoding."""
city, suffix = self._reverse_geocode(lat, lon)
if not city:
return None
return join_location(city, suffix)
def _suffix_for_coords(self, lat: float, lon: float) -> Optional[str]:
"""US state abbreviation or country name for coordinates (enriches a known city)."""
return self._reverse_geocode(lat, lon)[1]
def _zip_to_city_string(self, zipcode: str) -> Optional[str]:
"""US ZIP -> 'City, ST' via Zippopotam.us (free, no key, cached).
OSM/Nominatim often lacks the USPS city for a ZIP centroid (returns the
county instead), so for 5-digit US ZIPs this gives a far better name.
Returns None on failure (caller falls back to reverse geocoding).
"""
z = zipcode.strip()
if z in self._zip_cache:
return self._zip_cache[z]
name: Optional[str] = None
try:
resp = requests.get(f"https://api.zippopotam.us/us/{z}", timeout=self.url_timeout)
if resp.ok:
places = resp.json().get("places") or []
if places:
city = (places[0].get("place name") or "").strip()
st = (places[0].get("state abbreviation") or "").strip()
if city:
name = join_location(city, st)
except Exception as e:
self.logger.debug(f"Zippopotam ZIP lookup failed for {z}: {e}")
if name:
_cache_put(self._zip_cache, z, name)
return name
def _resolve_location(
self, message: MeshMessage, location: Optional[str]
) -> tuple[Optional[float], Optional[float], Optional[str], Optional[str]]:
"""Resolve to (lat, lon, location_label, error_key).
Mirrors the aurora command: no input falls back to companion location,
then a [Rain_Command] default, then the bot location. Coordinate-based
labels are reverse-geocoded to a city name for display.
"""
if not location or not location.strip():
co = self._get_companion_location(message)
if co:
label = self._coordinates_to_location_string(co[0], co[1]) or f"{co[0]:.1f},{co[1]:.1f}"
return (co[0], co[1], label, None)
default_lat = default_lon = None
if self.bot.config.has_section("Rain_Command"):
default_lat = self.bot.config.getfloat("Rain_Command", "default_lat", fallback=None)
default_lon = self.bot.config.getfloat("Rain_Command", "default_lon", fallback=None)
if default_lat is not None and default_lon is not None:
if -90 <= default_lat <= 90 and -180 <= default_lon <= 180:
label = self._coordinates_to_location_string(default_lat, default_lon) or f"{default_lat:.1f},{default_lon:.1f}"
return (default_lat, default_lon, label, None)
bot_loc = self._get_bot_location()
if bot_loc:
# Prefer the configured default city + state for the bot's own location.
if self.default_city:
suffix = self.default_state or self._suffix_for_coords(bot_loc[0], bot_loc[1])
label = join_location(self.default_city, suffix)
else:
label = self._coordinates_to_location_string(bot_loc[0], bot_loc[1]) or f"{bot_loc[0]:.1f},{bot_loc[1]:.1f}"
return (bot_loc[0], bot_loc[1], label, None)
return (None, None, None, "commands.rain.no_location")
loc = location.strip()
# Declared Optional up front: the coordinate branch assigns floats while
# the ZIP/city geocoders return Optional[float]; each is narrowed before use.
lat: Optional[float]
lon: Optional[float]
# Coordinates "lat,lon"
if re.match(r"^\s*-?\d+\.?\d*\s*,\s*-?\d+\.?\d*\s*$", loc):
try:
a, b = loc.split(",", 1)
lat, lon = float(a.strip()), float(b.strip())
if not (-90 <= lat <= 90) or not (-180 <= lon <= 180):
return (None, None, None, "commands.rain.error")
return (lat, lon, self._coordinates_to_location_string(lat, lon) or loc, None)
except ValueError:
return (None, None, None, "commands.rain.error")
# US ZIP (5 digits)
if re.match(r"^\s*\d{5}\s*$", loc):
lat, lon = geocode_zipcode_sync(
self.bot, loc, default_country=self.default_country, timeout=self.url_timeout
)
if lat is None or lon is None:
return (None, None, None, "commands.rain.no_location_zipcode")
# Name the ZIP "City, ST (zip)": Zippopotam first (reliable USPS city),
# then reverse geocoding, else just the ZIP.
zip_city = self._zip_to_city_string(loc) if self.zip_city_lookup else None
city = zip_city or self._coordinates_to_location_string(lat, lon)
label = f"{city} ({loc})" if city else loc
return (lat, lon, label, None)
# City
lat, lon, _ = geocode_city_sync(
self.bot,
loc,
default_state=self.default_state,
default_country=self.default_country,
include_address_info=False,
timeout=self.url_timeout,
)
if lat is None or lon is None:
return (None, None, None, "commands.rain.no_location_city")
# Keep the typed city name (more accurate than reverse geocoding for small
# towns), but append the state (US) or country (non-US) from the geocoder
# — stripping any region the user already typed so it isn't doubled.
suffix = self._suffix_for_coords(lat, lon)
typed_city = city_display_name(loc, suffix)
label = join_location(typed_city, suffix)
return (lat, lon, label, None)
def _fetch_series(self, lat: float, lon: float) -> Optional[dict]:
"""Fetch the precip series via the shared fetcher (own short-lived session)."""
session = self._create_retry_session()
try:
return fetch_precip_series(
session, lat, lon,
weather_model=self.weather_model, timeout=self.url_timeout, logger=self.logger,
cache_ttl=self.cache_ttl,
)
finally:
session.close()
def _window_label(self) -> str:
"""Human window length, e.g. '2h' or '90min'."""
if self.window_minutes % 60 == 0:
return f"{self.window_minutes // 60}h"
return f"{self.window_minutes}min"
def _ptype(self, bucket: Optional[str]) -> str:
"""Translatable precip-type label for a bucket."""
b = bucket or "rain"
return self.translate(f"commands.rain.precip_types.{b}")
def _detail_suffix(self, result: NowcastResult, prob: Optional[int], temp_f: Optional[int]) -> str:
"""Trailing detail: ' (est 0.2 in, 70%) 34°F' — amount + probability in the
parens, plus a temperature tag only when borderline (~30-38°F)."""
parts: list[str] = []
if self.show_amount:
amt = format_amount_estimate(result.bucket, result.amount_mm, result.snow_cm, self.amount_unit)
if amt:
parts.append(f"est {amt}")
if self.show_probability and prob is not None:
parts.append(f"{prob}%")
paren = f" ({', '.join(parts)})" if parts else ""
temp = f" {temp_f}°F" if (self.show_temp and temp_f is not None and 30 <= temp_f <= 38) else ""
return paren + temp
def _format_result(
self, result: NowcastResult, location_label: str,
*, asked_word: Optional[str] = None, mismatch: bool = False,
prob: Optional[int] = None, temp_f: Optional[int] = None,
) -> str:
"""Render a NowcastResult into a single mesh-friendly line.
``asked_word`` is the precip the user asked for ("rain"/"snow"); when
``mismatch`` is set the result is the *other* type, rendered as
"No <asked>, but <actual> ..." so a !snow that finds rain still helps.
``prob``/``temp_f`` add a probability and borderline-temperature tag.
"""
emoji = _BUCKET_EMOJI.get(result.bucket or "rain", "🌧️")
if result.state == "dry_clear":
return self.translate(
"commands.rain.clear", precip=asked_word or "rain",
window=self._window_label(), location=location_label,
)
if mismatch and asked_word:
ptype = f"No {asked_word}, but {self._ptype(result.bucket).lower()}"
else:
ptype = self._ptype(result.bucket)
if result.state == "dry_incoming":
if result.open_ended or not result.duration_minutes:
extra = self.translate("commands.rain.duration_open")
else:
extra = self.translate("commands.rain.duration_for", duration=result.duration_minutes)
return self.translate(
"commands.rain.starting",
emoji=emoji, ptype=ptype, minutes=result.minutes,
location=location_label, extra=extra,
) + self._detail_suffix(result, prob, temp_f)
if result.state == "raining_stopping":
return self.translate(
"commands.rain.stopping",
emoji=emoji, ptype=ptype, minutes=result.minutes, location=location_label,
) + self._detail_suffix(result, prob, temp_f)
# raining_continuing
return self.translate(
"commands.rain.continuing",
emoji=emoji, ptype=ptype, window=self._window_label(), location=location_label,
) + self._detail_suffix(result, prob, temp_f)
def _format_changeover(self, rain_r: NowcastResult, snow_r: NowcastResult, location_label: str) -> str:
"""A rain<->snow transition line, e.g. '🌧️→🌨️ Rain now → snow in ~60min
for X' — whichever type comes first leads."""
def start(r: NowcastResult) -> int:
return 0 if r.state in ("raining_stopping", "raining_continuing") else (r.minutes or 0)
first_r, second_r = (rain_r, snow_r) if start(rain_r) <= start(snow_r) else (snow_r, rain_r)
when = self.translate("commands.rain.now") if start(first_r) == 0 else f"in ~{start(first_r)}min"
return self.translate(
"commands.rain.changeover",
from_emoji=_BUCKET_EMOJI.get(first_r.bucket or "rain", "🌧️"),
to_emoji=_BUCKET_EMOJI.get(second_r.bucket or "snow", "🌨️"),
first=self._ptype(first_r.bucket), when=when,
second=self._ptype(second_r.bucket).lower(),
minutes=start(second_r), location=location_label,
)
def get_help_text(self, message: Any = None) -> str:
"""Help tailored to the keyword asked about: 'help snow' talks snow
(depth), 'help rain'/'help nowcast' talk rain (amount). Falls back to
the rain variant when the queried word can't be determined."""
word = "rain"
content = (getattr(message, "content", "") or "").strip()
if content.startswith("!"):
content = content[1:].strip()
parts = content.split()
if len(parts) >= 2:
word = parts[1].lower()
key = "commands.rain.help_snow" if word == "snow" else "commands.rain.help_rain"
return self.translate(key)
async def execute(self, message: MeshMessage) -> bool:
content = message.content.strip()
if content.startswith("!"):
content = content[1:].strip()
parts = content.split()
location: Optional[str] = " ".join(parts[1:]).strip() if len(parts) >= 2 else None
# Which keyword triggered us sets the precip we're asked about. !snow leads
# with snow, !rain with rain; !nowcast (or anything else) has no preference.
mode = parts[0].lower() if parts else "rain"
asked_word: Optional[str] = "snow" if mode == "snow" else (None if mode == "nowcast" else "rain")
# Bare country/US state (e.g. "france", "texas") -> default to its capital
# and append a heads-up, since one centroid point isn't representative.
region_note: Optional[str] = None
cap_query = region_capital_query(location)
if cap_query:
location = cap_query
region_note = REGION_DEFAULT_NOTE
lat, lon, location_label, err_key = self._resolve_location(message, location)
if lat is None or lon is None:
region = self.default_state or self.default_country
if err_key == "commands.rain.no_location":
await self.send_response(message, self.translate("commands.rain.no_location"))
elif err_key == "commands.rain.no_location_zipcode":
await self.send_response(
message, self.translate("commands.rain.no_location_zipcode", location=location or "")
)
elif err_key == "commands.rain.no_location_city":
await self.send_response(
message,
self.translate("commands.rain.no_location_city", location=location or "", state=region),
)
else:
await self.send_response(
message, self.translate("commands.rain.error", error="Invalid location or coordinates")
)
return True
try:
self.record_execution(message.sender_id)
loop = asyncio.get_event_loop()
series = await loop.run_in_executor(None, lambda: self._fetch_series(lat, lon))
except Exception as e:
self.logger.error(f"Error fetching rain nowcast: {e}")
await self.send_response(message, self.translate("commands.rain.error_fetching"))
return True
if not series:
await self.send_response(message, self.translate("commands.rain.error_fetching"))
return True
def run(fam: Optional[frozenset[str]]) -> Optional[NowcastResult]:
return analyze_precip_nowcast(
series["times"], series["precip"], series["codes"], series["now"],
window_minutes=self.window_minutes, threshold=self.threshold_mm,
current_precip=series.get("current_precip"), current_code=series.get("current_code"),
snow=series.get("snow"), family=fam,
)
# Analyze each precip family. Both present -> a changeover line; otherwise
# the asked-for type, falling back to the other with a "No <asked>, but …".
rain_r = run(RAIN_FAMILY)
snow_r = run(SNOW_FAMILY)
if rain_r is None or snow_r is None:
await self.send_response(message, self.translate("commands.rain.error_fetching"))
return True
rain_ok = rain_r.state != "dry_clear"
snow_ok = snow_r.state != "dry_clear"
label = location_label or f"{lat:.1f},{lon:.1f}"
if rain_ok and snow_ok:
response = self._format_changeover(rain_r, snow_r, label)
else:
if asked_word == "snow":
result, mismatch = (snow_r, False) if snow_ok else ((rain_r, True) if rain_ok else (snow_r, False))
elif asked_word == "rain":
result, mismatch = (rain_r, False) if rain_ok else ((snow_r, True) if snow_ok else (rain_r, False))
else: # nowcast: no type preference
result, mismatch = (rain_r if rain_ok else snow_r), False
prob, temp_f = episode_probability_temp(series, result)
response = self._format_result(
result, label, asked_word=asked_word, mismatch=mismatch, prob=prob, temp_f=temp_f,
)
if region_note:
response = f"{response} {region_note}"
max_len = self.get_max_message_length(message)
if len(response) > max_len:
response = response[: max_len - 3] + "..."
await self.send_response(message, response)
return True