Files
meshcore-bot/modules/service_plugins/weather_service.py
agessaman e037046400 Enhance weather service forecast output with high/low temperature display
- Added today's high and low temperatures to the weather forecast output, improving clarity for users.
- Updated formatting to include labels for high and low temperatures, ensuring better readability of the forecast information.
2026-03-25 20:26:22 -07:00

1660 lines
74 KiB
Python

#!/usr/bin/env python3
"""
Weather Service for MeshCore Bot
Provides scheduled weather forecasts and alert monitoring
"""
import asyncio
import time
import schedule
from datetime import datetime, timedelta, timezone
from typing import Optional, Dict, Any, List, Set
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import ephem
import xml.dom.minidom
import re
import json
import math
# Try to import MQTT client (use paho-mqtt like packet capture service)
try:
import paho.mqtt.client as mqtt
MQTT_AVAILABLE = True
except ImportError:
MQTT_AVAILABLE = False
mqtt = None
from .base_service import BaseServicePlugin
from ..url_shortener import shorten_url
class WeatherService(BaseServicePlugin):
"""Weather service providing scheduled forecasts and alert monitoring.
Manages daily weather forecasts, polls for NOAA weather alerts, and
monitors lightning strikes via MQTT (Blitzortung).
"""
config_section = 'Weather_Service'
description = "Scheduled weather forecasts and alert monitoring"
def __init__(self, bot: Any):
"""Initialize weather service.
Args:
bot: The bot instance.
"""
super().__init__(bot)
# Configuration
self.weather_alarm_time = self.bot.config.get('Weather_Service', 'weather_alarm', fallback='6:00')
self.my_position_lat = self.bot.config.getfloat('Weather_Service', 'my_position_lat', fallback=None)
self.my_position_lon = self.bot.config.getfloat('Weather_Service', 'my_position_lon', fallback=None)
self.weather_channel = self.bot.config.get('Weather_Service', 'weather_channel', fallback='general')
self.alerts_channel = self.bot.config.get('Weather_Service', 'alerts_channel', fallback='general')
# Polling intervals (in milliseconds, converted to seconds)
self.blitz_collection_interval = self.bot.config.getint('Weather_Service', 'blitz_collection_interval', fallback=600000) / 1000.0
self.poll_weather_alerts_interval = self.bot.config.getint('Weather_Service', 'poll_weather_alerts_interval', fallback=600000) / 1000.0
# Storm detection area (optional)
self.blitz_area = None
if self.bot.config.has_option('Weather_Service', 'blitz_area_min_lat'):
self.blitz_area = {
'min_lat': self.bot.config.getfloat('Weather_Service', 'blitz_area_min_lat'),
'min_lon': self.bot.config.getfloat('Weather_Service', 'blitz_area_min_lon'),
'max_lat': self.bot.config.getfloat('Weather_Service', 'blitz_area_max_lat'),
'max_lon': self.bot.config.getfloat('Weather_Service', 'blitz_area_max_lon'),
}
# Validate position
if self.my_position_lat is None or self.my_position_lon is None:
self.logger.warning("Weather service requires my_position_lat and my_position_lon in config")
self.enabled = False
return
# Create retry-enabled session for API calls
self.api_session = self._create_retry_session()
# Get temperature/wind units from config (for Open-Meteo)
self.temperature_unit = self.bot.config.get('Weather', 'temperature_unit', fallback='fahrenheit')
self.wind_speed_unit = self.bot.config.get('Weather', 'wind_speed_unit', fallback='mph')
self.precipitation_unit = self.bot.config.get('Weather', 'precipitation_unit', fallback='inch')
# Track seen alerts to avoid duplicates
self.seen_alert_ids: Set[str] = set()
# Track last alert check time to only send new alerts
self.last_alert_check_time: Optional[float] = None
# Background tasks
self._alerts_task: Optional[asyncio.Task] = None
self._forecast_task: Optional[asyncio.Task] = None
self._lightning_task: Optional[asyncio.Task] = None
self._forecast_job = None # Store schedule job for cleanup
self._running = False
# Track recent lightning strikes to avoid duplicates
self.recent_lightning_strikes: Set[str] = set()
# Lightning detection via MQTT
self.blitz_buffer: List[Dict[str, Any]] = []
self.seen_blitz_keys: Set[str] = set()
self.mqtt_client: Optional[Any] = None # paho.mqtt.client.Client
self.mqtt_task: Optional[asyncio.Task] = None
# Check if using sunrise/sunset
self.use_sunrise_sunset = self.weather_alarm_time.lower() in ['sunrise', 'sunset']
# Cache for location name (to avoid repeated reverse geocoding)
self._cached_location_name: Optional[str] = None
self.logger.info(f"Weather service initialized: position=({self.my_position_lat}, {self.my_position_lon}), alarm={self.weather_alarm_time}")
def _create_retry_session(self) -> requests.Session:
"""Create a requests session with retry logic for API calls.
Returns:
requests.Session: Configured session with retry adapter.
"""
session = requests.Session()
retry_strategy = Retry(
total=2,
backoff_factor=0.3,
status_forcelist=[500, 502, 503, 504],
allowed_methods=["GET"],
raise_on_status=False
)
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=10,
pool_maxsize=20
)
session.mount("https://", adapter)
session.mount("http://", adapter)
return session
def _get_sunrise_sunset_time(self, event: str) -> Optional[datetime]:
"""Get sunrise or sunset time for configured position.
Args:
event: 'sunrise' or 'sunset'.
Returns:
Optional[datetime]: Datetime object with the next sunrise/sunset time, or None on error.
"""
try:
obs = ephem.Observer()
obs.date = datetime.now(timezone.utc)
obs.lat = str(self.my_position_lat)
obs.lon = str(self.my_position_lon)
sun = ephem.Sun()
sun.compute(obs)
if event.lower() == 'sunrise':
next_event = ephem.localtime(obs.next_rising(sun))
elif event.lower() == 'sunset':
next_event = ephem.localtime(obs.next_setting(sun))
else:
return None
return next_event
except Exception as e:
self.logger.error(f"Error calculating {event}: {e}")
return None
async def start(self) -> None:
"""Start the weather service.
Initializes scheduled tasks for forecasts, alert polling, and lightning detection.
"""
if not self.enabled:
self.logger.info("Weather service is disabled, not starting")
return
self._running = True
self.logger.info("Starting weather service")
# Setup scheduled daily forecast
if self.use_sunrise_sunset:
# For sunrise/sunset, use a background task that reschedules daily
self._forecast_task = asyncio.create_task(self._sunrise_sunset_forecast_loop())
else:
# For fixed times, use schedule library
self._setup_daily_forecast()
# Start background tasks
self._alerts_task = asyncio.create_task(self._poll_weather_alerts_loop())
# Start lightning detection if area is configured
if self.blitz_area and MQTT_AVAILABLE:
self._lightning_task = asyncio.create_task(self._poll_lightning_loop())
self.mqtt_task = asyncio.create_task(self._connect_blitzortung_mqtt())
else:
self._lightning_task = None
self.mqtt_task = None
if self.blitz_area and not MQTT_AVAILABLE:
self.logger.warning("Lightning detection configured but paho-mqtt not available")
self.logger.info("Weather service started")
async def stop(self) -> None:
"""Stop the weather service.
cancels all background tasks and closes connections.
"""
self._running = False
self.logger.info("Stopping weather service")
# Cancel background tasks
if self._alerts_task:
self._alerts_task.cancel()
try:
await self._alerts_task
except asyncio.CancelledError:
pass
if self._forecast_task:
self._forecast_task.cancel()
try:
await self._forecast_task
except asyncio.CancelledError:
pass
if self._lightning_task:
self._lightning_task.cancel()
try:
await self._lightning_task
except asyncio.CancelledError:
pass
if self.mqtt_task:
self.mqtt_task.cancel()
try:
await self.mqtt_task
except asyncio.CancelledError:
pass
if self.mqtt_client:
try:
self.mqtt_client.loop_stop()
self.mqtt_client.disconnect()
except Exception:
pass
# Clear scheduled forecast
# Note: schedule library doesn't have a direct way to cancel a specific job
# The job will simply not execute if _running is False
# If needed, we could use schedule.clear() but that would clear ALL scheduled jobs
pass
self.logger.info("Weather service stopped")
def _setup_daily_forecast(self) -> None:
"""Setup daily weather forecast schedule for fixed times.
Configures the schedule library to trigger _send_daily_forecast at the configured time.
"""
try:
# Parse time (format: "HH:MM" or "H:MM")
if ':' in self.weather_alarm_time:
hour, minute = map(int, self.weather_alarm_time.split(':'))
else:
# Assume format "HHMM"
hour = int(self.weather_alarm_time[:2])
minute = int(self.weather_alarm_time[2:])
schedule_time = f"{hour:02d}:{minute:02d}"
self._forecast_job = schedule.every().day.at(schedule_time).do(self._send_daily_forecast)
self.logger.info(f"Scheduled daily weather forecast at {schedule_time}")
except Exception as e:
self.logger.error(f"Error setting up daily forecast schedule: {e}")
async def _sunrise_sunset_forecast_loop(self) -> None:
"""Background task for sunrise/sunset-based forecasts.
Calculates daily sunrise/sunset times and schedules the forecast accordingly.
"""
event_type = self.weather_alarm_time.lower()
self.logger.info(f"Starting {event_type}-based forecast loop")
while self._running:
try:
# Calculate next sunrise/sunset time
next_event = self._get_sunrise_sunset_time(event_type)
if not next_event:
self.logger.error(f"Failed to calculate {event_type} time, retrying in 1 hour")
await asyncio.sleep(3600)
continue
# Calculate seconds until next event
now = datetime.now()
if next_event.tzinfo:
# Convert to local time if timezone-aware
if now.tzinfo:
next_event = next_event.astimezone(now.tzinfo).replace(tzinfo=None)
else:
next_event = next_event.replace(tzinfo=None)
wait_seconds = (next_event - now).total_seconds()
# If the event already passed today, wait until tomorrow's calculation
if wait_seconds < 0:
# Wait until after midnight, then recalculate
wait_seconds = 3600 # Wait 1 hour and recalculate
self.logger.debug(f"{event_type} already passed today, waiting to recalculate")
else:
self.logger.info(f"Next {event_type} at {next_event.strftime('%H:%M:%S')}, waiting {wait_seconds:.0f} seconds")
# Wait until the event time (or 1 hour if already passed)
await asyncio.sleep(max(1, min(wait_seconds, 86400))) # Cap at 24 hours
# Check if we should send forecast (only if we waited for the actual event)
if wait_seconds > 0 and wait_seconds < 86400:
await self._send_daily_forecast_async()
# Small delay after sending to avoid immediate recalculation
await asyncio.sleep(60)
except asyncio.CancelledError:
break
except Exception as e:
self.logger.error(f"Error in {event_type} forecast loop: {e}")
await asyncio.sleep(3600) # Wait 1 hour on error
def _send_daily_forecast(self) -> None:
"""Send daily weather forecast (called by schedule library).
Wrapper to run the async forecast sender from the synchronous schedule job.
"""
if not self._running:
return
self.logger.info(f"📅 Sending daily weather forecast at {datetime.now().strftime('%H:%M:%S')}")
# Use the main event loop if available, otherwise create a new one
# This prevents deadlock when the main loop is already running
if hasattr(self.bot, 'main_event_loop') and self.bot.main_event_loop and self.bot.main_event_loop.is_running():
# Schedule coroutine in the running main event loop
future = asyncio.run_coroutine_threadsafe(
self._send_daily_forecast_async(),
self.bot.main_event_loop
)
# Wait for completion (with timeout to prevent indefinite blocking)
try:
future.result(timeout=120) # 2 minute timeout for weather forecast
except Exception as e:
self.logger.error(f"Error sending daily weather forecast: {e}")
else:
# Fallback: create new event loop if main loop not available
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self._send_daily_forecast_async())
async def _send_daily_forecast_async(self) -> None:
"""Send daily weather forecast (async implementation).
Fetches the forecast and sends it to the configured channel.
Uses Open-Meteo for weather data and manages its own error logging.
"""
try:
# Get weather forecast
forecast_text = await self._get_weather_forecast()
if forecast_text and forecast_text != "Error fetching weather data":
# Send to configured channel
await self.bot.command_manager.send_channel_message(
self.weather_channel,
f"🌤️ Daily Weather: {forecast_text}"
)
self.logger.info(f"Daily weather forecast sent to {self.weather_channel}")
else:
self.logger.warning("Failed to get weather forecast for daily update")
except Exception as e:
self.logger.error(f"Error sending daily weather forecast: {e}")
async def _get_weather_forecast(self) -> str:
"""Get weather forecast for configured position using Open-Meteo API.
Returns:
str: Formatted forecast string or error message.
"""
try:
# Open-Meteo API endpoint
api_url = "https://api.open-meteo.com/v1/forecast"
params = {
'latitude': self.my_position_lat,
'longitude': self.my_position_lon,
'current': 'temperature_2m,relative_humidity_2m,apparent_temperature,weather_code,wind_speed_10m,wind_direction_10m,wind_gusts_10m',
'daily': 'weather_code,temperature_2m_max,temperature_2m_min,precipitation_probability_max,wind_speed_10m_max',
'temperature_unit': self.temperature_unit,
'wind_speed_unit': self.wind_speed_unit,
'precipitation_unit': self.precipitation_unit,
'timezone': 'auto',
'forecast_days': 2 # Today and tomorrow
}
try:
response = self.api_session.get(api_url, params=params, timeout=10)
if not response.ok:
self.logger.warning(f"Error fetching weather from Open-Meteo: HTTP {response.status_code}")
return "Error fetching weather data"
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
self.logger.warning(f"Timeout/connection error fetching weather: {e}")
return "Error fetching weather data"
data = response.json()
# Extract current conditions
current = data.get('current', {})
daily = data.get('daily', {})
if not current or not daily:
return "No forecast data available"
# Current conditions
temp = int(current.get('temperature_2m', 0))
weather_code = current.get('weather_code', 0)
wind_speed = int(current.get('wind_speed_10m', 0))
wind_direction = self._degrees_to_direction(current.get('wind_direction_10m', 0))
# Get weather description and emoji
weather_desc = self._get_weather_description(weather_code)
weather_emoji = self._get_weather_emoji(weather_code)
# Temperature unit symbol
temp_symbol = "°F" if self.temperature_unit == 'fahrenheit' else "°C"
# Get location name (cached to avoid repeated API calls)
if self._cached_location_name is None:
try:
from ..utils import rate_limited_nominatim_reverse, format_location_for_display
coordinates_str = f"{self.my_position_lat}, {self.my_position_lon}"
location = await rate_limited_nominatim_reverse(self.bot, coordinates_str, timeout=5)
if location and hasattr(location, 'raw'):
address = location.raw.get('address', {})
city = (address.get('city') or
address.get('town') or
address.get('village') or
address.get('municipality') or
address.get('suburb') or
None)
state = (address.get('state') or
address.get('province') or
address.get('region') or
None)
country = address.get('country')
location_name = format_location_for_display(city, state, country)
if not location_name:
location_name = f"{self.my_position_lat:.2f},{self.my_position_lon:.2f}"
else:
location_name = f"{self.my_position_lat:.2f},{self.my_position_lon:.2f}"
self._cached_location_name = location_name
except Exception as e:
self.logger.debug(f"Error reverse geocoding location: {e}")
location_name = f"{self.my_position_lat:.2f},{self.my_position_lon:.2f}"
self._cached_location_name = location_name
else:
location_name = self._cached_location_name
# Format current forecast
forecast_text = f"{location_name}: {weather_emoji}{weather_desc} {temp}{temp_symbol}"
if wind_speed > 0:
wind_dir_str = f"{wind_direction}" if wind_direction else ""
forecast_text += f" {wind_dir_str}{wind_speed}{self.wind_speed_unit}"
today_high = int(daily['temperature_2m_max'][0])
today_low = int(daily['temperature_2m_min'][0])
# Show high/low with labels to make it clear
forecast_text += f" | H:{today_high}{temp_symbol} L:{today_low}{temp_symbol}"
# Add tomorrow's forecast
daily_times = daily.get('time', [])
daily_codes = daily.get('weather_code', [])
daily_max = daily.get('temperature_2m_max', [])
daily_min = daily.get('temperature_2m_min', [])
if len(daily_times) > 1 and len(daily_codes) > 1:
tomorrow_code = daily_codes[1]
tomorrow_max = int(daily_max[1]) if len(daily_max) > 1 else None
tomorrow_min = int(daily_min[1]) if len(daily_min) > 1 else None
tomorrow_desc = self._get_weather_description(tomorrow_code)
tomorrow_emoji = self._get_weather_emoji(tomorrow_code)
if tomorrow_max is not None:
if tomorrow_min is not None and tomorrow_min != tomorrow_max:
forecast_text += f" | Tomorrow: {tomorrow_emoji}{tomorrow_desc} {tomorrow_min}-{tomorrow_max}{temp_symbol}"
else:
forecast_text += f" | Tomorrow: {tomorrow_emoji}{tomorrow_desc} {tomorrow_max}{temp_symbol}"
return forecast_text
except Exception as e:
self.logger.error(f"Error getting weather forecast: {e}")
import traceback
self.logger.debug(traceback.format_exc())
return "Error fetching weather data"
def _degrees_to_direction(self, degrees: float) -> str:
"""Convert wind direction in degrees to compass direction.
Args:
degrees: Wind direction in degrees (0-360).
Returns:
str: Compass direction (e.g., 'N', 'NE', 'SW').
"""
if degrees is None:
return ""
directions = ['N', 'NNE', 'NE', 'ENE', 'E', 'ESE', 'SE', 'SSE',
'S', 'SSW', 'SW', 'WSW', 'W', 'WNW', 'NW', 'NNW']
index = int((degrees + 11.25) / 22.5) % 16
return directions[index]
def _get_weather_description(self, code: int) -> str:
"""Get weather description from WMO weather code.
Args:
code: WMO weather code integer.
Returns:
str: Human-readable weather description.
"""
# WMO Weather interpretation codes (WW)
codes = {
0: "Clear", 1: "Mostly Clear", 2: "Partly Cloudy", 3: "Overcast",
45: "Foggy", 48: "Depositing Rime Fog",
51: "Light Drizzle", 53: "Moderate Drizzle", 55: "Dense Drizzle",
56: "Light Freezing Drizzle", 57: "Dense Freezing Drizzle",
61: "Slight Rain", 63: "Moderate Rain", 65: "Heavy Rain",
66: "Light Freezing Rain", 67: "Heavy Freezing Rain",
71: "Slight Snow", 73: "Moderate Snow", 75: "Heavy Snow",
77: "Snow Grains", 80: "Slight Rain Showers", 81: "Moderate Rain Showers",
82: "Violent Rain Showers", 85: "Slight Snow Showers", 86: "Heavy Snow Showers",
95: "Thunderstorm", 96: "Thunderstorm w/Hail", 99: "Severe Thunderstorm"
}
return codes.get(code, "Unknown")
def _get_weather_emoji(self, code: int) -> str:
"""Get weather emoji from WMO weather code.
Args:
code: WMO weather code integer.
Returns:
str: Emoji character representing the weather.
"""
if code == 0:
return "☀️"
elif code in [1, 2]:
return "🌤️"
elif code == 3:
return "☁️"
elif code in [45, 48]:
return "🌫️"
elif code in [51, 53, 55, 56, 57, 61, 63, 65, 66, 67, 80, 81, 82]:
return "🌧️"
elif code in [71, 73, 75, 77, 85, 86]:
return "❄️"
elif code in [95, 96, 99]:
return "⛈️"
else:
return "🌤️"
async def _poll_weather_alerts_loop(self) -> None:
"""Background task to poll for weather alerts.
Runs periodically based on configured interval.
"""
self.logger.info(f"Starting weather alerts polling (interval: {self.poll_weather_alerts_interval}s)")
while self._running:
try:
await self._check_weather_alerts()
await asyncio.sleep(self.poll_weather_alerts_interval)
except asyncio.CancelledError:
break
except Exception as e:
self.logger.error(f"Error in weather alerts polling loop: {e}")
await asyncio.sleep(60) # Wait 1 minute on error before retrying
async def _check_weather_alerts(self) -> None:
"""Check for new weather alerts (US-only via NOAA API).
Note: Open-Meteo doesn't provide weather alerts, so we use NOAA which is US-only.
For international locations, alerts will not be available.
Only sends alerts that were issued since the last check.
"""
try:
# Get current time for this check
current_check_time = time.time()
# Calculate time window: only alerts issued since last check (or last polling interval if first check)
if self.last_alert_check_time is None:
# First check: only get alerts from the last polling interval
time_window_start = current_check_time - self.poll_weather_alerts_interval
else:
# Subsequent checks: only get alerts since last check
time_window_start = self.last_alert_check_time
# Round coordinates
lat_rounded = round(self.my_position_lat, 4)
lon_rounded = round(self.my_position_lon, 4)
# NOAA alerts API (US-only)
alert_url = f"https://api.weather.gov/alerts/active.atom?point={lat_rounded},{lon_rounded}"
try:
alert_data = self.api_session.get(alert_url, timeout=10)
if not alert_data.ok:
self.logger.debug(f"Error fetching alerts: HTTP {alert_data.status_code}")
return
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
self.logger.debug(f"Timeout/connection error fetching alerts: {e}")
return
# Parse ATOM feed with full metadata extraction (same as wx_command)
alerts = []
alertxml = xml.dom.minidom.parseString(alert_data.text)
for entry in alertxml.getElementsByTagName("entry"):
try:
# Get alert ID
alert_id_elem = entry.getElementsByTagName("id")
if not alert_id_elem or not alert_id_elem[0].childNodes:
continue
alert_id = alert_id_elem[0].childNodes[0].nodeValue
# Skip if we've already seen this alert
if alert_id in self.seen_alert_ids:
continue
# Get entry updated timestamp (most reliable - when alert was last updated/issued)
entry_updated_time = None
updated_elem = entry.getElementsByTagName("updated")
if updated_elem and updated_elem[0].childNodes:
updated_str = updated_elem[0].childNodes[0].nodeValue
entry_updated_time = self._parse_iso_time(updated_str)
# Extract full alert metadata (same logic as wx_command)
alert_dict = self._parse_alert_entry(entry, alert_id)
if not alert_dict:
continue
# Determine alert issued time (prefer entry updated time, then effective time)
alert_issued_time = entry_updated_time
if alert_issued_time is None:
alert_issued_time = self._parse_alert_time(alert_dict.get('effective', ''))
if alert_issued_time is None:
# If we can't parse any time, use current time as fallback
# This means we'll send it, but it's better than missing new alerts
alert_issued_time = current_check_time
self.logger.debug(f"Could not parse time for alert {alert_id}, using current time")
# Only include alerts issued since last check
if alert_issued_time >= time_window_start:
alerts.append(alert_dict)
self.seen_alert_ids.add(alert_id)
self.logger.debug(f"New alert {alert_id} issued at {datetime.fromtimestamp(alert_issued_time)}")
else:
# Alert is older than our window, mark as seen but don't send
self.seen_alert_ids.add(alert_id)
self.logger.debug(f"Skipping old alert {alert_id} (issued {datetime.fromtimestamp(alert_issued_time)} before time window start {datetime.fromtimestamp(time_window_start)})")
except Exception as e:
self.logger.debug(f"Error parsing alert entry: {e}")
continue
# Send new alerts with compact formatting
for alert in alerts:
try:
# Format alert using compact formatter (same as wx_command)
alert_text = await self._format_alert_compact(alert, include_details=True)
await self.bot.command_manager.send_channel_message(
self.alerts_channel,
alert_text
)
self.logger.info(f"Weather alert sent: {alert.get('title', 'Unknown')}")
# Small delay between alerts
await asyncio.sleep(2)
except Exception as e:
self.logger.error(f"Error sending weather alert: {e}")
# Update last check time
self.last_alert_check_time = current_check_time
# Clean up old alert IDs (keep last 100)
if len(self.seen_alert_ids) > 100:
self.seen_alert_ids = set(list(self.seen_alert_ids)[-100:])
except Exception as e:
self.logger.error(f"Error checking weather alerts: {e}")
async def _connect_blitzortung_mqtt(self) -> None:
"""Connect to Blitzortung MQTT broker and subscribe to lightning data.
Maintains a connection to the MQTT broker for real-time lightning strikes.
"""
if not self.blitz_area or not MQTT_AVAILABLE:
return
broker_host = "blitzortung.ha.sed.pl"
broker_port = 1883
topic = "blitzortung/1.1/#"
self.logger.info(f"Connecting to Blitzortung MQTT broker: {broker_host}:{broker_port}")
while self._running:
try:
# Create paho-mqtt client
client_id = f"meshcore_weather_{int(time.time())}"
client = mqtt.Client(client_id=client_id)
self.mqtt_client = client
# Set up message callback
def on_message(client, userdata, msg):
try:
# Decode message
payload = msg.payload.decode('utf-8')
blitz_data = json.loads(payload)
# Check if strike is within our area
lat = blitz_data.get('lat')
lon = blitz_data.get('lon')
if lat is None or lon is None:
return
if (self.blitz_area['min_lat'] <= lat <= self.blitz_area['max_lat'] and
self.blitz_area['min_lon'] <= lon <= self.blitz_area['max_lon']):
# Schedule async processing
asyncio.create_task(self._handle_lightning_strike(blitz_data))
except json.JSONDecodeError:
self.logger.debug("Invalid JSON in lightning MQTT message")
except Exception as e:
self.logger.debug(f"Error processing lightning MQTT message: {e}")
client.on_message = on_message
# Connect and subscribe (non-blocking to avoid blocking event loop)
loop = asyncio.get_event_loop()
try:
await loop.run_in_executor(None, client.connect, broker_host, broker_port, 60)
except Exception as connect_error:
# Connection failed, but don't block - will retry on next cycle
self.logger.debug(f"Initial connect() call failed (non-blocking): {connect_error}")
raise # Re-raise to trigger retry logic
# Subscribe is non-blocking, but wrap it anyway for consistency
try:
client.subscribe(topic)
except Exception as subscribe_error:
self.logger.debug(f"Subscribe() call failed: {subscribe_error}")
raise
client.loop_start()
self.logger.info(f"Connected to Blitzortung MQTT, subscribed to {topic}")
# Keep connection alive
while self._running:
await asyncio.sleep(1)
if not client.is_connected():
self.logger.warning("Blitzortung MQTT disconnected, reconnecting...")
break
client.loop_stop()
client.disconnect()
except asyncio.CancelledError:
break
except Exception as e:
self.logger.error(f"Error in Blitzortung MQTT connection: {e}")
if self._running:
self.logger.info("Reconnecting to Blitzortung MQTT in 30 seconds...")
await asyncio.sleep(30)
async def _handle_lightning_strike(self, blitz_data: Dict[str, Any]) -> None:
"""Handle a single lightning strike from MQTT.
Calculates distance and adds to buffer if within range.
Args:
blitz_data: Dictionary containing lightning strike data.
"""
lat = blitz_data.get('lat')
lon = blitz_data.get('lon')
if lat is None or lon is None:
return
# Calculate heading and distance from bot position
heading, distance = self._calculate_heading_and_distance(
self.my_position_lat, self.my_position_lon, lat, lon
)
# Create bucket key (same as original: heading|distance/10)
distance_bucket = int(distance / 10)
key = f"{heading}|{distance_bucket}"
# Add to buffer
self.blitz_buffer.append({
'key': key,
'heading': heading,
'distance': distance,
'lat': lat,
'lon': lon,
'timestamp': blitz_data.get('time', time.time())
})
def _calculate_heading_and_distance(self, lat1: float, lon1: float, lat2: float, lon2: float) -> tuple:
"""Calculate heading and distance between two points (same as original implementation).
Args:
lat1: Latitude of point 1.
lon1: Longitude of point 1.
lat2: Latitude of point 2.
lon2: Longitude of point 2.
Returns:
tuple: (heading_degrees, distance_km)
"""
# Convert to radians
lat1_rad = math.radians(lat1)
lat2_rad = math.radians(lat2)
dlon_rad = math.radians(lon2 - lon1)
# Calculate distance using Haversine formula
a = math.sin((lat2_rad - lat1_rad) / 2)**2 + \
math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(dlon_rad / 2)**2
c = 2 * math.asin(math.sqrt(a))
distance_km = 6371 * c # Earth radius in km
# Calculate bearing/heading
y = math.sin(dlon_rad) * math.cos(lat2_rad)
x = math.cos(lat1_rad) * math.sin(lat2_rad) - \
math.sin(lat1_rad) * math.cos(lat2_rad) * math.cos(dlon_rad)
heading_rad = math.atan2(y, x)
heading_deg = (math.degrees(heading_rad) + 360) % 360
return (int(heading_deg), distance_km)
async def _poll_lightning_loop(self) -> None:
"""Background task to aggregate and report lightning strikes.
Periodically processes the lightning buffer and sends alerts.
"""
self.logger.info(f"Starting lightning aggregation (interval: {self.blitz_collection_interval}s)")
while self._running:
try:
await self._process_lightning_buffer()
await asyncio.sleep(self.blitz_collection_interval)
except asyncio.CancelledError:
break
except Exception as e:
self.logger.error(f"Error in lightning aggregation loop: {e}")
await asyncio.sleep(60) # Wait 1 minute on error before retrying
async def _process_lightning_buffer(self) -> None:
"""Process buffered lightning strikes and send alerts if threshold met.
Groups strikes by location bucket and sends alerts if count exceeds threshold.
"""
if not self.blitz_buffer:
return
# Count strikes by bucket key
counter = {}
for blitz in self.blitz_buffer:
key = blitz['key']
counter[key] = counter.get(key, 0) + 1
# Check each bucket
for key, count in counter.items():
# Only alert if 10+ strikes in bucket and we haven't seen this bucket before
if count >= 10 and key not in self.seen_blitz_keys:
# Find a representative strike from this bucket
bucket_strikes = [b for b in self.blitz_buffer if b['key'] == key]
if not bucket_strikes:
continue
data = bucket_strikes[0]
heading = data['heading']
distance = data['distance']
# Get compass direction name
compass_name = self._heading_to_compass(heading)
# Try to geocode location (optional, may fail)
location_name = await self._geocode_location(data['lat'], data['lon'])
# Format message
if location_name:
message = f"🌩️ {location_name} ({int(distance)}km {compass_name})"
else:
message = f"🌩️ Lightning activity ({int(distance)}km {compass_name})"
await self.bot.command_manager.send_channel_message(
self.alerts_channel,
message
)
self.logger.info(f"Lightning alert sent: {message}")
# Mark this bucket as seen
self.seen_blitz_keys.add(key)
# Small delay between alerts
await asyncio.sleep(2)
# Clear buffer
self.blitz_buffer = []
# Clean up old seen keys (keep last 1000)
if len(self.seen_blitz_keys) > 1000:
self.seen_blitz_keys = set(list(self.seen_blitz_keys)[-1000:])
def _heading_to_compass(self, heading: int) -> str:
"""Convert heading in degrees to compass direction name.
Args:
heading: Heading in degrees.
Returns:
str: Compass direction abbreviation (e.g., 'N', 'NW').
"""
compass_points = [
'N', 'NNE', 'NE', 'ENE', 'E', 'ESE', 'SE', 'SSE',
'S', 'SSW', 'SW', 'WSW', 'W', 'WNW', 'NW', 'NNW'
]
index = int((heading + 11.25) / 22.5) % 16
return compass_points[index]
async def _geocode_location(self, lat: float, lon: float) -> Optional[str]:
"""Geocode coordinates to location name (optional, may return None).
Args:
lat: Latitude.
lon: Longitude.
Returns:
Optional[str]: City/town name or None if lookup fails.
"""
try:
# Use reverse geocoding if available in utils
from ..utils import rate_limited_nominatim_reverse_sync
location = rate_limited_nominatim_reverse_sync(self.bot, f"{lat}, {lon}", timeout=5)
if location:
# Extract city/town name
if isinstance(location, dict):
return location.get('city') or location.get('town') or location.get('village') or None
return str(location)
except Exception:
pass
return None
def _parse_alert_entry(self, entry: Any, alert_id: str) -> Optional[Dict[str, Any]]:
"""Parse alert XML entry and extract full metadata (same logic as wx_command).
Args:
entry: XML DOM entry element.
alert_id: Alert ID string.
Returns:
Optional[Dict[str, Any]]: Alert dict with event, event_type, severity, expires, office, etc., or None on error.
"""
try:
# Extract title
title_elem = entry.getElementsByTagName("title")
title = title_elem[0].childNodes[0].nodeValue if title_elem and title_elem[0].childNodes else ""
if not title:
return None
# Extract link URL (ATOM feeds have <link> elements)
# Prefer HTML links over CAP XML links
link_url = ""
html_link_url = ""
cap_link_url = ""
link_elems = entry.getElementsByTagName("link")
for link_elem in link_elems:
href = ""
if link_elem.hasAttribute("href"):
href = link_elem.getAttribute("href")
elif link_elem.childNodes and link_elem.firstChild:
href = link_elem.firstChild.nodeValue
if not href:
continue
# Check link type and rel attributes
link_type = link_elem.getAttribute("type") or ""
link_rel = link_elem.getAttribute("rel") or ""
# Prefer HTML links
if "text/html" in link_type or link_rel == "alternate":
html_link_url = href
# Track CAP XML links as fallback
elif "cap+xml" in link_type or href.endswith(".cap") or "/alerts/" in href:
cap_link_url = href
# Use HTML link if available, otherwise fall back to first link or CAP link
if html_link_url:
link_url = html_link_url
elif cap_link_url:
# Convert CAP XML URL to HTML view URL
link_url = self._convert_cap_url_to_html(cap_link_url)
elif link_elems:
# Fallback: use first link found
first_link = link_elems[0]
if first_link.hasAttribute("href"):
href = first_link.getAttribute("href")
if href.endswith(".cap") or "/alerts/" in href:
link_url = self._convert_cap_url_to_html(href)
else:
link_url = href
elif first_link.childNodes and first_link.firstChild:
href = first_link.firstChild.nodeValue
if href.endswith(".cap") or "/alerts/" in href:
link_url = self._convert_cap_url_to_html(href)
else:
link_url = href
# Extract summary/content
summary = ""
summary_elem = entry.getElementsByTagName("summary")
if summary_elem and summary_elem[0].childNodes:
summary = summary_elem[0].childNodes[0].nodeValue if summary_elem[0].childNodes[0].nodeValue else ""
if not summary:
content_elem = entry.getElementsByTagName("content")
if content_elem and content_elem[0].childNodes:
summary = content_elem[0].childNodes[0].nodeValue if content_elem[0].childNodes[0].nodeValue else ""
# Extract NWS headline parameter
nws_headline = ""
params = entry.getElementsByTagName("cap:parameter")
if not params:
params = entry.getElementsByTagName("parameter")
for param in params:
value_name_elem = param.getElementsByTagName("valueName")
value_elem = param.getElementsByTagName("value")
if value_name_elem and value_elem and value_name_elem[0].childNodes and value_elem[0].childNodes:
value_name = value_name_elem[0].childNodes[0].nodeValue if value_name_elem[0].childNodes[0].nodeValue else ""
if value_name == "NWSheadline":
nws_headline = value_elem[0].childNodes[0].nodeValue if value_elem[0].childNodes[0].nodeValue else ""
break
# Extract CAP metadata
event = ""
severity = "Unknown"
urgency = "Unknown"
certainty = "Unknown"
effective = ""
expires = ""
area_desc = ""
office = ""
# Parse title to extract key info
title_lower = title.lower()
# Extract event type from title
if "warning" in title_lower:
event_type = "Warning"
event_match = re.search(r'^([^W]+?)\s+Warning', title, re.IGNORECASE)
if event_match:
event = event_match.group(1).strip()
elif "watch" in title_lower:
event_type = "Watch"
event_match = re.search(r'^([^W]+?)\s+Watch', title, re.IGNORECASE)
if event_match:
event = event_match.group(1).strip()
elif "advisory" in title_lower:
event_type = "Advisory"
event_match = re.search(r'^([^A]+?)\s+Advisory', title, re.IGNORECASE)
if event_match:
event = event_match.group(1).strip()
elif "statement" in title_lower:
event_type = "Statement"
event_match = re.search(r'^([^S]+?)\s+Statement', title, re.IGNORECASE)
if event_match:
event = event_match.group(1).strip()
else:
event = "Special"
# For Special Statements, extract meaningful description from NWS headline
if event.lower() in ["special", "special weather"] and nws_headline:
headline_lower = nws_headline.lower()
if any(phrase in headline_lower for phrase in ['debris flow', 'mudslide']):
event = "Debris Flow"
elif 'landslide' in headline_lower:
event = "Landslide (Burn)" if ('burn' in headline_lower or 'burned area' in headline_lower) else "Landslide"
elif any(phrase in headline_lower for phrase in ['flash flood', 'river flood', 'flood', 'flooding']):
event = "Flood"
elif any(phrase in headline_lower for phrase in ['high wind', 'strong wind', 'damaging wind', 'wind', 'gust']):
event = "Wind"
elif any(phrase in headline_lower for phrase in ['heavy rain', 'excessive rain', 'rain', 'rainfall', 'precipitation']):
if not any(word in headline_lower for word in ['landslide', 'flood', 'wind', 'snow']):
event = "Rainfall"
elif any(phrase in headline_lower for phrase in ['heavy snow', 'blizzard', 'winter storm', 'snow', 'winter']):
event = "Snow"
elif any(phrase in headline_lower for phrase in ['dense fog', 'low visibility', 'fog', 'visibility']):
event = "Fog" if 'fog' in headline_lower else "Visibility"
elif any(phrase in headline_lower for phrase in ['extreme heat', 'excessive heat', 'heat', 'temperature']):
event = "Heat" if 'heat' in headline_lower else "Temperature"
elif any(phrase in headline_lower for phrase in ['storm surge', 'coastal flood', 'marine', 'coastal']):
event = "Marine"
else:
# Extract first meaningful word
headline_words = headline_lower.split()
skip_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by', 'will', 'lead', 'to', 'an', 'increased', 'threat', 'remains', 'in', 'effect', 'until', 'during', 'last', 'week', 'including', 'today'}
meaningful_words = [w for w in headline_words if w not in skip_words and len(w) > 3]
if meaningful_words:
event = meaningful_words[0].capitalize()
# Fallback to summary if still generic
if event.lower() in ["special", "special weather"] and summary:
summary_lower = summary.lower()
if any(word in summary_lower for word in ['landslide', 'debris flow', 'mudslide']):
event = "Landslide"
elif any(word in summary_lower for word in ['hydrologic', 'river', 'flood', 'stream']):
event = "Hydrologic"
elif any(word in summary_lower for word in ['marine', 'coastal', 'beach', 'surf']):
event = "Marine"
elif any(word in summary_lower for word in ['wind', 'gust']):
event = "Wind"
elif any(word in summary_lower for word in ['rain', 'precipitation', 'shower', 'rainfall']):
event = "Rainfall"
if event.lower() in ["special", "special weather"]:
event = "Weather" if "weather" in title_lower else "Special"
else:
event_type = "Unknown"
event = title.split()[0] if title else ""
# Extract times from title
issued_match = re.search(r'issued\s+([^u]+?)\s+until\s+(.+?)\s+by', title, re.IGNORECASE)
if issued_match:
effective = issued_match.group(1).strip()
expires = issued_match.group(2).strip()
else:
until_match = re.search(r'until\s+(.+?)\s+by', title, re.IGNORECASE)
if until_match:
expires = until_match.group(1).strip()
# Extract office from title
office_match = re.search(r'by\s+(.+?)$', title, re.IGNORECASE)
if office_match:
office = office_match.group(1).strip()
# Try to extract CAP elements
def get_node_value(node):
if not node or not node.childNodes:
return ""
text_parts = []
for child in node.childNodes:
if child.nodeType == child.TEXT_NODE:
text_parts.append(child.nodeValue)
elif hasattr(child, 'nodeValue') and child.nodeValue:
text_parts.append(child.nodeValue)
return " ".join(text_parts).strip()
for child in entry.childNodes:
if hasattr(child, 'tagName'):
tag_name = child.tagName
tag_lower = tag_name.lower()
if ('event' in tag_lower or tag_name.endswith(':event')) and not event:
event_val = get_node_value(child)
if event_val:
event = event_val
elif 'severity' in tag_lower or tag_name.endswith(':severity'):
severity_val = get_node_value(child)
if severity_val:
severity = severity_val
elif 'urgency' in tag_lower or tag_name.endswith(':urgency'):
urgency_val = get_node_value(child)
if urgency_val:
urgency = urgency_val
elif 'certainty' in tag_lower or tag_name.endswith(':certainty'):
certainty_val = get_node_value(child)
if certainty_val:
certainty = certainty_val
elif 'effective' in tag_lower or tag_name.endswith(':effective'):
effective_val = get_node_value(child)
if effective_val:
effective = effective_val
elif 'expires' in tag_lower or tag_name.endswith(':expires'):
expires_val = get_node_value(child)
if expires_val:
expires = expires_val
elif ('areadesc' in tag_lower or 'area' in tag_lower or
tag_name.endswith(':areadesc') or tag_name.endswith(':area')):
area_val = get_node_value(child)
if area_val:
area_desc = area_val
# Infer severity if not found
if severity == "Unknown":
if any(word in event.lower() for word in ['extreme', 'tornado', 'hurricane', 'blizzard']):
severity = "Extreme"
elif any(word in event.lower() for word in ['severe', 'warning']):
severity = "Severe"
elif any(word in event.lower() for word in ['advisory', 'moderate']):
severity = "Moderate"
else:
severity = "Minor"
# Infer urgency if not found
if urgency == "Unknown":
if event_type == "Warning":
urgency = "Immediate"
elif event_type == "Watch":
urgency = "Expected"
else:
urgency = "Future"
return {
'id': alert_id,
'title': title,
'summary': summary,
'nws_headline': nws_headline,
'event': event,
'event_type': event_type,
'severity': severity,
'urgency': urgency,
'certainty': certainty,
'effective': effective,
'expires': expires,
'area_desc': area_desc,
'office': office,
'link': link_url
}
except Exception as e:
self.logger.debug(f"Error parsing alert entry: {e}")
return None
async def _format_alert_compact(self, alert: Dict[str, Any], include_details: bool = True) -> str:
"""Format a single alert compactly (same as wx_command).
Args:
alert: Alert dict with event, event_type, severity, expires, office, etc.
include_details: If True, include expiration time and office.
Returns:
str: Formatted alert string.
"""
event = alert.get('event', '')
event_type = alert.get('event_type', '')
severity = alert.get('severity', 'Unknown')
expires = alert.get('expires', '')
office = alert.get('office', '')
link_url = alert.get('link', '')
area_desc = alert.get('area_desc', '')
# Get severity emoji
severity_emoji = {
'Extreme': '🔴',
'Severe': '🟠',
'Moderate': '🟡',
'Minor': '',
'Unknown': ''
}.get(severity, '')
# Format event type abbreviation
event_type_abbrev = {
'Warning': 'Warn',
'Watch': 'Watch',
'Advisory': 'Adv',
'Statement': 'Stmt'
}.get(event_type, event_type)
# Build compact alert string
if include_details:
result = severity_emoji
# Add event and type
if event:
event_lower = event.lower()
event_type_lower = event_type.lower()
if event_type_lower in event_lower:
event_short = event
if len(event) > 15:
words = event.split()
if len(words) > 2:
event_short = ' '.join(words[:2])
else:
event_short = event[:15]
result += event_short
else:
event_short = event
if len(event) > 15:
words = event.split()
if len(words) > 2:
event_short = ' '.join(words[:2])
else:
event_short = event[:15]
result += f"{event_short} {event_type_abbrev}"
else:
result += event_type_abbrev
# Add location (area description) if available - compact format
if area_desc:
# Extract first location from area_desc (often contains multiple locations)
# Format: "Seattle, WA" or "King County; Snohomish County" etc.
locations = [loc.strip() for loc in area_desc.split(';')]
first_location = locations[0]
# Try to extract just city/area name if it's long
# e.g., "Seattle, WA" -> "Seattle" or "King County" -> "King"
if ',' in first_location:
# Has state/country - take just the city part
location_parts = first_location.split(',')
location_short = location_parts[0].strip()
else:
# No comma, might be "King County" -> take first word
location_words = first_location.split()
if len(location_words) > 1 and location_words[-1].lower() in ['county', 'parish', 'borough']:
location_short = location_words[0]
else:
location_short = first_location
# Limit location length to keep message compact
if len(location_short) > 20:
location_short = location_short[:20]
result += f" {location_short}"
# Add expiration time if available
if expires:
expires_compact = self._compact_time(expires)
if any(month in expires_compact for month in ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]):
time_match = re.search(r'(\d+)(AM|PM)', expires_compact, re.IGNORECASE)
if time_match:
hour = time_match.group(1)
am_pm = time_match.group(2)
expires_short = f" til {hour}{am_pm}"
else:
expires_short = f" til {expires_compact[:15]}"
else:
time_match = re.search(r'(\d+):?(\d+)?(AM|PM)', expires_compact, re.IGNORECASE)
if time_match:
hour = time_match.group(1)
am_pm = time_match.group(3)
expires_short = f" til {hour}{am_pm}"
else:
expires_short = f" til {expires_compact[:15]}"
result += expires_short
# Add office if available (abbreviate city name)
if office:
office_parts = office.split()
if len(office_parts) >= 2:
office_org = office_parts[0]
city = office_parts[1] if len(office_parts) > 1 else ""
city_abbrev = self._abbreviate_city_name(city)
office_short = f" by {office_org} {city_abbrev}"
else:
office_short = f" by {office[:10]}"
result += office_short
# Add shortened URL if available and there's space (within 130 char limit)
if link_url and len(result) < 100: # Leave ~30 chars for shortened URL
short_url = await self._shorten_url(link_url)
if short_url:
test_result = result + f" {short_url}"
if len(test_result) <= 130: # Mesh message limit
result = test_result
# If even shortened doesn't fit, try with just a link indicator
elif len(result) < 120:
result = result + " 🔗"
return result
else:
return f"{severity_emoji}{event} {event_type_abbrev}" if event else f"{severity_emoji}{event_type_abbrev}"
def _compact_time(self, time_str: str) -> str:
"""Compact time format (same as wx_command).
Args:
time_str: Time string to format.
Returns:
str: Compact formatted time string.
"""
if not time_str:
return time_str
# Check if it's ISO format
if 'T' in time_str and re.match(r'\d{4}-\d{2}-\d{2}T', time_str):
try:
dt = datetime.fromisoformat(time_str.replace('Z', '+00:00'))
month_abbrevs = ["Jan", "Feb", "Mar", "Apr", "May", "Jun",
"Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]
month = month_abbrevs[dt.month - 1]
day = dt.day
hour = dt.hour
if hour == 0:
hour_12 = 12
am_pm = "AM"
elif hour < 12:
hour_12 = hour
am_pm = "AM"
elif hour == 12:
hour_12 = 12
am_pm = "PM"
else:
hour_12 = hour - 12
am_pm = "PM"
return f"{month} {day} {hour_12}{am_pm}"
except Exception:
pass
# Remove leading zeros from hours
time_str = re.sub(r'(\d+):00(AM|PM)', r'\1\2', time_str)
# Abbreviate month names
month_abbrevs = {
"January": "Jan", "February": "Feb", "March": "Mar", "April": "Apr",
"May": "May", "June": "Jun", "July": "Jul", "August": "Aug",
"September": "Sep", "October": "Oct", "November": "Nov", "December": "Dec"
}
for full, abbrev in month_abbrevs.items():
time_str = time_str.replace(full, abbrev)
# Remove "at" before time
time_str = re.sub(r'\s+at\s+', ' ', time_str)
return time_str
def _abbreviate_city_name(self, city: str) -> str:
"""Abbreviate city names for compact display (same as wx_command).
Args:
city: Full city name.
Returns:
str: Abbreviated city name.
"""
if not city:
return city
city_abbrevs = {
"Seattle": "SEA", "Portland": "PDX", "San Francisco": "SF",
"Los Angeles": "LA", "New York": "NYC", "Chicago": "CHI",
"Houston": "HOU", "Phoenix": "PHX", "Philadelphia": "PHL",
"San Antonio": "SAT", "San Diego": "SAN", "Dallas": "DAL",
"San Jose": "SJC", "Austin": "AUS", "Jacksonville": "JAX",
"Columbus": "CMH", "Fort Worth": "FTW", "Charlotte": "CLT",
"Denver": "DEN", "Washington": "DC", "Boston": "BOS",
"El Paso": "ELP", "Detroit": "DTW", "Nashville": "BNA",
"Oklahoma City": "OKC", "Las Vegas": "LAS", "Memphis": "MEM",
"Louisville": "SDF", "Baltimore": "BWI", "Milwaukee": "MKE",
"Albuquerque": "ABQ", "Tucson": "TUS", "Fresno": "FAT",
"Sacramento": "SAC", "Kansas City": "KC", "Mesa": "MSC",
"Atlanta": "ATL", "Omaha": "OMA", "Colorado Springs": "COS",
"Raleigh": "RDU", "Virginia Beach": "ORF", "Miami": "MIA",
"Oakland": "OAK", "Minneapolis": "MSP", "Tulsa": "TUL",
"Cleveland": "CLE", "Wichita": "ICT", "Arlington": "ARL",
"Tampa": "TPA", "New Orleans": "MSY", "Honolulu": "HNL",
"Anchorage": "ANC", "Bellingham": "BLI", "Everett": "EVE",
"Spokane": "GEG", "Tacoma": "TAC", "Yakima": "YKM",
"Olympia": "OLM", "Vancouver": "YVR", "Victoria": "YYJ"
}
if city in city_abbrevs:
return city_abbrevs[city]
for full_name, abbrev in city_abbrevs.items():
if full_name in city:
return abbrev
words = city.split()
if len(words) > 1:
abbrev = ''.join([w[0].upper() for w in words[:3]])
if len(abbrev) <= 4:
return abbrev
return city[:4].upper() if len(city) >= 4 else city.upper()
def _parse_iso_time(self, time_str: str) -> Optional[float]:
"""Parse ISO 8601 timestamp to Unix timestamp.
Args:
time_str: ISO 8601 time string (e.g., "2025-12-16T15:12:00-08:00" or "2025-12-16T15:12:00Z").
Returns:
Optional[float]: Unix timestamp (seconds since epoch), or None if parsing fails.
"""
if not time_str:
return None
try:
# Handle ISO format with timezone
dt = datetime.fromisoformat(time_str.replace('Z', '+00:00'))
return dt.timestamp()
except (ValueError, AttributeError):
return None
def _parse_alert_time(self, time_str: str) -> Optional[float]:
"""Parse alert effective/issued time string to Unix timestamp.
Args:
time_str: Time string from alert (e.g., "December 16 at 3:12PM PST" or ISO format).
Returns:
Optional[float]: Unix timestamp (seconds since epoch), or None if parsing fails.
"""
if not time_str:
return None
# Try ISO format first (e.g., "2025-12-16T15:12:00-08:00")
if 'T' in time_str or time_str.startswith('202'):
try:
dt = datetime.fromisoformat(time_str.replace('Z', '+00:00'))
return dt.timestamp()
except (ValueError, AttributeError):
pass
# Try parsing from title format: "issued December 16 at 3:12PM PST"
# This is a fallback for when effective time is in text format
try:
# Look for date and time patterns
# Pattern: "December 16 at 3:12PM" or "Dec 16 3:12PM"
date_match = re.search(r'(\w+)\s+(\d+)', time_str)
time_match = re.search(r'(\d+):?(\d+)?(AM|PM)', time_str, re.IGNORECASE)
if date_match and time_match:
# For simplicity, assume it's recent (within last 7 days)
# This is a rough estimate - we'll use current time as fallback
# The important thing is we can compare relative times
now = datetime.now()
# Try to extract day
day = int(date_match.group(2))
hour_str = time_match.group(1)
am_pm = time_match.group(3).upper()
hour = int(hour_str)
if am_pm == 'PM' and hour != 12:
hour += 12
elif am_pm == 'AM' and hour == 12:
hour = 0
# Estimate: assume it's today or yesterday if day matches
# This is approximate but good enough for filtering
if day == now.day:
# Today
dt = now.replace(hour=hour, minute=0, second=0, microsecond=0)
elif day == (now.day - 1) or (now.day == 1 and day >= 28):
# Yesterday or last month
dt = (now - timedelta(days=1)).replace(hour=hour, minute=0, second=0, microsecond=0)
else:
# Rough estimate: assume within last week
dt = now.replace(day=day, hour=hour, minute=0, second=0, microsecond=0)
if dt > now:
dt = dt - timedelta(days=30) # Probably last month
return dt.timestamp()
except Exception:
pass
# If all parsing fails, return None (will use current time as fallback)
return None
def _convert_cap_url_to_html(self, cap_url: str) -> str:
"""Convert CAP XML URL to a more readable format.
For NOAA alerts, converts CAP XML URLs to API URLs that return JSON format.
According to NWS API documentation (https://www.weather.gov/documentation/services-web-api),
the API supports content negotiation and returns GeoJSON by default, which browsers
can display in a readable format with syntax highlighting.
Note: The NWS alerts webpage has been decommissioned, so there is no direct HTML
view of individual alerts. The API JSON format is the most readable option available.
Args:
cap_url: CAP XML URL (e.g., https://api.weather.gov/alerts/urn:oid:....cap)
Returns:
str: API URL that returns JSON format (more readable than XML).
"""
if not cap_url:
return cap_url
# Check if this is a NOAA API alert URL
if "api.weather.gov/alerts/" in cap_url:
# Extract alert identifier from URL
# Pattern: https://api.weather.gov/alerts/urn:oid:... or ...urn:oid:....cap
parts = cap_url.split("/alerts/")
if len(parts) > 1:
alert_id = parts[1].split("?")[0].split("#")[0] # Remove query params and fragments
# Remove .cap extension if present
if alert_id.endswith(".cap"):
alert_id = alert_id[:-4]
# Use the API URL without .cap extension
# Per NWS API docs, this returns GeoJSON by default (application/geo+json)
# which browsers can display with syntax highlighting, making it readable
# This is the best available option since the alerts webpage was decommissioned
return f"https://api.weather.gov/alerts/{alert_id}"
# Check if URL ends with .cap or contains alert identifier
if cap_url.endswith(".cap") or "urn:oid" in cap_url or "urn_oid" in cap_url:
# Try to extract the alert identifier
# Pattern: urn:oid:... or urn_oid_... (may include .cap extension)
# First try to extract from the path
if "/alerts/" in cap_url:
parts = cap_url.split("/alerts/")
if len(parts) > 1:
alert_id = parts[1].split("?")[0].split("#")[0]
if alert_id.endswith(".cap"):
alert_id = alert_id[:-4]
# Convert underscores to colons if needed
alert_id = alert_id.replace("_", ":")
# Use the API URL without .cap extension
# Returns GeoJSON format which browsers display nicely
return f"https://api.weather.gov/alerts/{alert_id}"
# Fallback: extract using regex
match = re.search(r'urn[:_]oid[:_]([^./?&#]+)', cap_url)
if match:
alert_id = match.group(1).replace("_", ":")
# Remove .cap if it was captured
if alert_id.endswith(".cap"):
alert_id = alert_id[:-4]
# Use the API URL - returns GeoJSON format
return f"https://api.weather.gov/alerts/{alert_id}"
# If we can't convert it, return the original URL
# The URL shortener might still work, but users will get XML
return cap_url
async def _shorten_url(self, url: str) -> str:
"""Shorten URL using [External_Data] short_url_website (default v.gd)."""
return await shorten_url(
url,
config=self.bot.config,
session=self.api_session,
logger=self.logger,
)