mirror of
https://github.com/agessaman/meshcore-bot.git
synced 2026-03-30 12:05:38 +00:00
- Introduced functionality to configure custom weather sources in config.ini, allowing users to specify URLs for different locations. - Enhanced WxCommand and GlobalWxCommand classes to fetch weather data from custom WXSIM sources, improving flexibility in weather reporting. - Implemented methods to retrieve and parse weather data from specified sources, ensuring compatibility with existing weather command structures. - Updated documentation in config.ini.example to guide users on configuring custom weather sources.
766 lines
27 KiB
Python
766 lines
27 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
WXSIM Plaintext Parser
|
|
Parses WXSIM plaintext.txt forecast files into structured data
|
|
|
|
Based on the PHP parser by Ken True (Saratoga-Weather.org)
|
|
https://github.com/ktrue/WXSIM-forecast/blob/master/plaintext-parser.php
|
|
|
|
Usage:
|
|
from modules.clients.wxsim_parser import WXSIMParser
|
|
|
|
# Parse from URL
|
|
parser = WXSIMParser()
|
|
text = parser.fetch_from_url('https://example.com/plaintext.txt')
|
|
if text:
|
|
forecast = parser.parse(text)
|
|
current = parser.format_current_conditions(forecast, temp_unit='fahrenheit', wind_unit='mph')
|
|
summary = parser.format_forecast_summary(forecast, num_days=7, temp_unit='fahrenheit', wind_unit='mph')
|
|
|
|
# Or parse from file/string
|
|
with open('plaintext.txt', 'r') as f:
|
|
text = f.read()
|
|
forecast = parser.parse(text)
|
|
|
|
# Access structured data
|
|
for period in forecast.periods:
|
|
print(f"{period.day_name}: {period.conditions} {period.high_temp}°C/{period.low_temp}°C")
|
|
"""
|
|
|
|
import re
|
|
import requests
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional, Tuple, Any
|
|
from dataclasses import dataclass, field
|
|
from enum import Enum
|
|
|
|
|
|
class PeriodType(Enum):
|
|
"""Forecast period type"""
|
|
DAY = "day"
|
|
NIGHT = "night"
|
|
UNKNOWN = "unknown"
|
|
|
|
|
|
@dataclass
|
|
class HourlyData:
|
|
"""Single hour of forecast data"""
|
|
date: str # e.g., "May 5"
|
|
time: str # e.g., "7:00 A" or "12:00 P"
|
|
hour: int # 0-23
|
|
temperature: float
|
|
wind_speed: int
|
|
humidity: int
|
|
sky_cover: int # %SC
|
|
visibility: int # %VST
|
|
visibility_miles: float # VIS
|
|
precip_chance: int # PC/HR
|
|
rain_total: float # RN TOT
|
|
weather: str # Weather condition text
|
|
|
|
|
|
@dataclass
|
|
class ForecastPeriod:
|
|
"""A forecast period (day or night)"""
|
|
day_name: str # e.g., "Friday", "Today"
|
|
date: str # e.g., "May 5"
|
|
period_type: PeriodType
|
|
high_temp: Optional[float] = None
|
|
low_temp: Optional[float] = None
|
|
conditions: str = ""
|
|
wind_speed: Optional[int] = None
|
|
wind_direction: Optional[str] = None
|
|
precip_chance: Optional[int] = None
|
|
precip_amount: Optional[float] = None
|
|
hourly_data: List[HourlyData] = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class WXSIMForecast:
|
|
"""Complete WXSIM forecast data"""
|
|
city: str = ""
|
|
station: str = ""
|
|
update_time: str = ""
|
|
update_date: str = ""
|
|
periods: List[ForecastPeriod] = field(default_factory=list)
|
|
hourly_data: List[HourlyData] = field(default_factory=list)
|
|
raw_text: str = ""
|
|
|
|
|
|
class WXSIMParser:
|
|
"""Parser for WXSIM plaintext.txt files"""
|
|
|
|
# Weather condition mappings (abbreviated to full descriptions)
|
|
WEATHER_CONDITIONS = {
|
|
'CLEAR': 'Clear',
|
|
'SUNNY': 'Sunny',
|
|
'FAIR': 'Fair',
|
|
'FAIR-P.C.': 'Fair to Partly Cloudy',
|
|
'P.CLOUDY': 'Partly Cloudy',
|
|
'P.-M.CLDY': 'Partly to Mostly Cloudy',
|
|
'M.CLOUDY': 'Mostly Cloudy',
|
|
'M.C.-CLDY': 'Mostly Cloudy',
|
|
'CLOUDY': 'Cloudy',
|
|
'DNS.OVCST': 'Dense Overcast',
|
|
'OVCST': 'Overcast',
|
|
'FOGGY': 'Foggy',
|
|
'DRIZZLE': 'Drizzle',
|
|
'DRZL': 'Drizzle',
|
|
'CHNC. DRZL': 'Chance Drizzle',
|
|
'CHNC. SHWR': 'Chance Showers',
|
|
'SHOWERS': 'Showers',
|
|
'RAIN': 'Rain',
|
|
'CHNC. RAIN': 'Chance Rain',
|
|
'SNOW': 'Snow',
|
|
'CHNC. SNOW': 'Chance Snow',
|
|
'T-STM': 'Thunderstorm',
|
|
'CHNC. T-STM': 'Chance Thunderstorm',
|
|
}
|
|
|
|
def __init__(self):
|
|
"""Initialize the parser"""
|
|
self.current_year = datetime.now().year
|
|
self.current_date = datetime.now()
|
|
|
|
def parse(self, text: str) -> WXSIMForecast:
|
|
"""Parse WXSIM plaintext content.
|
|
|
|
Args:
|
|
text: The plaintext content from WXSIM plaintext.txt file
|
|
|
|
Returns:
|
|
WXSIMForecast: Parsed forecast data
|
|
"""
|
|
forecast = WXSIMForecast()
|
|
forecast.raw_text = text
|
|
|
|
lines = text.split('\n')
|
|
|
|
# Find FORECAST RUN section (skip calibration)
|
|
forecast_start = self._find_forecast_start(lines)
|
|
if forecast_start == -1:
|
|
return forecast
|
|
|
|
# Parse header info (city, station, date)
|
|
self._parse_header(lines[:forecast_start], forecast)
|
|
|
|
# Parse forecast data
|
|
forecast_lines = lines[forecast_start:]
|
|
hourly_data = self._parse_hourly_data(forecast_lines)
|
|
forecast.hourly_data = hourly_data
|
|
|
|
# Extract forecast date/time from first data point
|
|
if hourly_data:
|
|
first_data = hourly_data[0]
|
|
forecast.update_date = first_data.date # e.g., "May 5"
|
|
forecast.update_time = first_data.time # e.g., "7:00 A"
|
|
|
|
# Group into periods (days)
|
|
forecast.periods = self._group_into_periods(hourly_data, forecast_lines)
|
|
|
|
return forecast
|
|
|
|
def _find_forecast_start(self, lines: List[str]) -> int:
|
|
"""Find the start of the FORECAST RUN section.
|
|
|
|
Args:
|
|
lines: All lines from the file
|
|
|
|
Returns:
|
|
int: Index of first forecast data line, or -1 if not found
|
|
"""
|
|
for i, line in enumerate(lines):
|
|
if 'FORECAST RUN:' in line.upper():
|
|
# Find the header line "DATE TIME TEMP..."
|
|
for j in range(i, min(i + 10, len(lines))):
|
|
if 'DATE' in lines[j] and 'TIME' in lines[j] and 'TEMP' in lines[j]:
|
|
# Return line after header
|
|
return j + 2 # Skip header and blank line
|
|
return -1
|
|
|
|
def _parse_header(self, lines: List[str], forecast: WXSIMForecast) -> None:
|
|
"""Parse header information (city, station, date).
|
|
|
|
Args:
|
|
lines: Header lines (before FORECAST RUN)
|
|
forecast: Forecast object to populate
|
|
"""
|
|
# Look for city/station info in header
|
|
# WXSIM format may vary, so we'll extract what we can
|
|
for line in lines:
|
|
# Look for common patterns
|
|
if 'FORECAST FOR' in line.upper():
|
|
# Extract city name
|
|
parts = line.split('FORECAST FOR', 1)
|
|
if len(parts) > 1:
|
|
forecast.city = parts[1].strip()
|
|
elif 'BY' in line.upper() and not forecast.station:
|
|
# Extract station/forecaster name
|
|
parts = line.split('BY', 1)
|
|
if len(parts) > 1:
|
|
forecast.station = parts[1].strip()
|
|
|
|
def _parse_hourly_data(self, lines: List[str]) -> List[HourlyData]:
|
|
"""Parse hourly forecast data rows.
|
|
|
|
Args:
|
|
lines: Lines from FORECAST RUN section onwards
|
|
|
|
Returns:
|
|
List[HourlyData]: Parsed hourly data
|
|
"""
|
|
hourly_data = []
|
|
|
|
for line in lines:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
# Skip non-data lines
|
|
if (line.startswith('DATE') or
|
|
line.startswith('FORECAST') or
|
|
line.startswith('CALIBRATION') or
|
|
line.startswith('SURFACE WIND') or
|
|
line.startswith('AUTO CLOUDS') or
|
|
line.startswith('Press') or
|
|
line.startswith('(Automatically') or
|
|
line.startswith('-') or
|
|
line.startswith('(')):
|
|
continue
|
|
|
|
# Try to parse as data row
|
|
# Format: "May 5 7:00 A 9.3 0 73 95 50 42.2 3 0.0 M.C.-CLDY CHNC. DRZL"
|
|
data = self._parse_data_row(line)
|
|
if data:
|
|
hourly_data.append(data)
|
|
|
|
return hourly_data
|
|
|
|
def _parse_data_row(self, line: str) -> Optional[HourlyData]:
|
|
"""Parse a single data row.
|
|
|
|
Args:
|
|
line: Single line of forecast data
|
|
|
|
Returns:
|
|
Optional[HourlyData]: Parsed data or None if invalid
|
|
"""
|
|
# Pattern: Month Day Time Temp Wind Hum %SC %VST VIS PC/HR RN TOT Weather
|
|
# Example: "May 5 7:00 A 9.3 0 73 95 50 42.2 3 0.0 M.C.-CLDY CHNC. DRZL"
|
|
|
|
# Match month and day at start
|
|
match = re.match(r'^([A-Za-z]+)\s+(\d+)\s+(\d{1,2}):(\d{2})\s+([AP])\s+', line)
|
|
if not match:
|
|
return None
|
|
|
|
month_name = match.group(1)
|
|
day = int(match.group(2))
|
|
hour_12 = int(match.group(3))
|
|
minute = int(match.group(4))
|
|
am_pm = match.group(5)
|
|
|
|
# Convert to 24-hour
|
|
hour_24 = hour_12
|
|
if am_pm == 'P' and hour_12 != 12:
|
|
hour_24 = hour_12 + 12
|
|
elif am_pm == 'A' and hour_12 == 12:
|
|
hour_24 = 0
|
|
|
|
# Extract remaining fields (split by whitespace)
|
|
rest = line[match.end():].strip()
|
|
parts = rest.split()
|
|
|
|
if len(parts) < 8:
|
|
return None
|
|
|
|
try:
|
|
# Parse numeric fields
|
|
temp = float(parts[0])
|
|
wind = int(parts[1])
|
|
humidity = int(parts[2])
|
|
sky_cover = int(parts[3])
|
|
visibility_pct = int(parts[4])
|
|
visibility_miles = float(parts[5])
|
|
precip_chance = int(parts[6])
|
|
rain_total = float(parts[7])
|
|
|
|
# Weather condition is everything after the numeric fields
|
|
weather = ' '.join(parts[8:]) if len(parts) > 8 else ''
|
|
|
|
# Format date string
|
|
date_str = f"{month_name} {day}"
|
|
time_str = f"{hour_12}:{minute:02d} {am_pm}"
|
|
|
|
return HourlyData(
|
|
date=date_str,
|
|
time=time_str,
|
|
hour=hour_24,
|
|
temperature=temp,
|
|
wind_speed=wind,
|
|
humidity=humidity,
|
|
sky_cover=sky_cover,
|
|
visibility=visibility_pct,
|
|
visibility_miles=visibility_miles,
|
|
precip_chance=precip_chance,
|
|
rain_total=rain_total,
|
|
weather=weather
|
|
)
|
|
except (ValueError, IndexError):
|
|
return None
|
|
|
|
def _group_into_periods(self, hourly_data: List[HourlyData], lines: List[str]) -> List[ForecastPeriod]:
|
|
"""Group hourly data into forecast periods (days).
|
|
|
|
Args:
|
|
hourly_data: List of hourly data points
|
|
lines: Original lines (to find day separators)
|
|
|
|
Returns:
|
|
List[ForecastPeriod]: Forecast periods
|
|
"""
|
|
periods = []
|
|
|
|
if not hourly_data:
|
|
return periods
|
|
|
|
# Find day separators in original lines
|
|
day_separators = self._find_day_separators(lines)
|
|
|
|
# Group hourly data by day
|
|
current_day = None
|
|
current_period_data = []
|
|
|
|
for data in hourly_data:
|
|
# Check if this is a new day (by date string or hour reset)
|
|
if current_day is None or data.date != current_day:
|
|
# Save previous period if exists
|
|
if current_period_data:
|
|
period = self._create_period_from_hourly(current_day, current_period_data, day_separators)
|
|
if period:
|
|
periods.append(period)
|
|
|
|
# Start new day
|
|
current_day = data.date
|
|
current_period_data = [data]
|
|
else:
|
|
current_period_data.append(data)
|
|
|
|
# Add final period
|
|
if current_period_data:
|
|
period = self._create_period_from_hourly(current_day, current_period_data, day_separators)
|
|
if period:
|
|
periods.append(period)
|
|
|
|
return periods
|
|
|
|
def _find_day_separators(self, lines: List[str]) -> Dict[str, str]:
|
|
"""Find day name separators in the file.
|
|
|
|
Args:
|
|
lines: All lines from the file
|
|
|
|
Returns:
|
|
Dict[str, str]: Mapping of date string to day name
|
|
"""
|
|
separators = {}
|
|
|
|
# Look for lines like " Friday"
|
|
for i, line in enumerate(lines):
|
|
line_stripped = line.strip()
|
|
# Check if line contains a day name
|
|
day_names = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
|
|
for day_name in day_names:
|
|
if day_name in line_stripped:
|
|
# Try to find the date from nearby lines
|
|
# Look at next few lines for date pattern
|
|
for j in range(i + 1, min(i + 5, len(lines))):
|
|
date_match = re.search(r'([A-Za-z]+)\s+(\d+)', lines[j])
|
|
if date_match:
|
|
date_str = f"{date_match.group(1)} {date_match.group(2)}"
|
|
separators[date_str] = day_name
|
|
break
|
|
break
|
|
|
|
return separators
|
|
|
|
def _create_period_from_hourly(self, date: str, hourly_data: List[HourlyData],
|
|
day_separators: Dict[str, str]) -> Optional[ForecastPeriod]:
|
|
"""Create a forecast period from hourly data.
|
|
|
|
Args:
|
|
date: Date string (e.g., "May 5")
|
|
hourly_data: Hourly data for this day
|
|
day_separators: Mapping of dates to day names
|
|
|
|
Returns:
|
|
Optional[ForecastPeriod]: Forecast period or None
|
|
"""
|
|
if not hourly_data:
|
|
return None
|
|
|
|
# Get day name - try to determine from date or use separator
|
|
day_name = day_separators.get(date)
|
|
if not day_name:
|
|
# Try to determine day name from date
|
|
try:
|
|
# Parse date (e.g., "May 5")
|
|
month_name, day_num = date.split()
|
|
month_map = {
|
|
'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6,
|
|
'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12
|
|
}
|
|
month = month_map.get(month_name[:3], 1)
|
|
day = int(day_num)
|
|
|
|
# Create datetime and get day name
|
|
forecast_date = datetime(self.current_year, month, day)
|
|
day_name = forecast_date.strftime('%A')
|
|
|
|
# If it's today, use "Today" instead
|
|
today = datetime.now()
|
|
if forecast_date.date() == today.date():
|
|
day_name = "Today"
|
|
elif forecast_date.date() == (today + timedelta(days=1)).date():
|
|
day_name = "Tomorrow"
|
|
except (ValueError, KeyError):
|
|
day_name = "Today"
|
|
|
|
# Determine period type (day vs night)
|
|
# Day: roughly 6 AM to 6 PM, Night: 6 PM to 6 AM
|
|
day_hours = [d for d in hourly_data if 6 <= d.hour < 18]
|
|
night_hours = [d for d in hourly_data if d.hour < 6 or d.hour >= 18]
|
|
|
|
# Use the period with more data, or default to day
|
|
if len(day_hours) >= len(night_hours):
|
|
period_type = PeriodType.DAY
|
|
primary_data = day_hours if day_hours else hourly_data
|
|
else:
|
|
period_type = PeriodType.NIGHT
|
|
primary_data = night_hours if night_hours else hourly_data
|
|
|
|
# Calculate high/low temps
|
|
temps = [d.temperature for d in hourly_data]
|
|
high_temp = max(temps) if temps else None
|
|
low_temp = min(temps) if temps else None
|
|
|
|
# Get most common weather condition
|
|
conditions = self._get_primary_condition(hourly_data)
|
|
|
|
# Get average wind speed
|
|
wind_speeds = [d.wind_speed for d in hourly_data if d.wind_speed > 0]
|
|
avg_wind = int(sum(wind_speeds) / len(wind_speeds)) if wind_speeds else None
|
|
|
|
# Get max precip chance
|
|
precip_chances = [d.precip_chance for d in hourly_data]
|
|
max_precip_chance = max(precip_chances) if precip_chances else None
|
|
|
|
# Get total precipitation
|
|
total_precip = sum(d.rain_total for d in hourly_data)
|
|
|
|
period = ForecastPeriod(
|
|
day_name=day_name,
|
|
date=date,
|
|
period_type=period_type,
|
|
high_temp=high_temp,
|
|
low_temp=low_temp,
|
|
conditions=conditions,
|
|
wind_speed=avg_wind,
|
|
precip_chance=max_precip_chance,
|
|
precip_amount=total_precip if total_precip > 0 else None,
|
|
hourly_data=hourly_data
|
|
)
|
|
|
|
return period
|
|
|
|
def _get_primary_condition(self, hourly_data: List[HourlyData]) -> str:
|
|
"""Get the primary weather condition from hourly data.
|
|
|
|
Args:
|
|
hourly_data: List of hourly data points
|
|
|
|
Returns:
|
|
str: Primary weather condition description
|
|
"""
|
|
if not hourly_data:
|
|
return "Unknown"
|
|
|
|
# Count condition occurrences
|
|
condition_counts = {}
|
|
for data in hourly_data:
|
|
# Normalize condition text
|
|
condition = data.weather.strip().upper()
|
|
if condition:
|
|
# Expand abbreviations
|
|
for abbrev, full in self.WEATHER_CONDITIONS.items():
|
|
if abbrev in condition:
|
|
condition = full
|
|
break
|
|
condition_counts[condition] = condition_counts.get(condition, 0) + 1
|
|
|
|
if not condition_counts:
|
|
return "Unknown"
|
|
|
|
# Return most common condition
|
|
return max(condition_counts.items(), key=lambda x: x[1])[0]
|
|
|
|
def format_current_conditions(self, forecast: WXSIMForecast,
|
|
temp_unit: str = 'celsius',
|
|
wind_unit: str = 'kph') -> str:
|
|
"""Format current conditions for display.
|
|
|
|
Args:
|
|
forecast: Parsed forecast data
|
|
temp_unit: Temperature unit ('celsius' or 'fahrenheit')
|
|
wind_unit: Wind speed unit ('kph', 'mph', or 'ms')
|
|
|
|
Returns:
|
|
str: Formatted current conditions string
|
|
"""
|
|
if not forecast.hourly_data:
|
|
return "No current data available"
|
|
|
|
# Get the first hour from FORECAST RUN (this is the current/starting conditions)
|
|
# The last hour would be in the future, so we use the first one
|
|
current = forecast.hourly_data[0]
|
|
|
|
# Convert temperature
|
|
temp = self._convert_temp(current.temperature, temp_unit)
|
|
temp_symbol = "°F" if temp_unit == 'fahrenheit' else "°C"
|
|
|
|
# Convert wind speed
|
|
wind = self._convert_wind(current.wind_speed, wind_unit)
|
|
wind_unit_str = self._get_wind_unit_str(wind_unit)
|
|
|
|
# Format condition
|
|
condition = self._normalize_condition(current.weather)
|
|
|
|
# Build string
|
|
result = f"{condition} {temp}{temp_symbol}"
|
|
|
|
if current.wind_speed > 0:
|
|
result += f" Wind {wind}{wind_unit_str}"
|
|
|
|
if current.humidity > 0:
|
|
result += f" {current.humidity}%RH"
|
|
|
|
if current.precip_chance > 0:
|
|
result += f" {current.precip_chance}% PoP"
|
|
|
|
return result
|
|
|
|
def format_forecast_summary(self, forecast: WXSIMForecast, num_days: int = 7,
|
|
temp_unit: str = 'celsius',
|
|
wind_unit: str = 'kph') -> str:
|
|
"""Format forecast summary for display.
|
|
|
|
Args:
|
|
forecast: Parsed forecast data
|
|
num_days: Number of days to include
|
|
temp_unit: Temperature unit ('celsius' or 'fahrenheit')
|
|
wind_unit: Wind speed unit ('kph', 'mph', or 'ms')
|
|
|
|
Returns:
|
|
str: Formatted forecast summary
|
|
"""
|
|
if not forecast.periods:
|
|
return "No forecast data available"
|
|
|
|
parts = []
|
|
for period in forecast.periods[:num_days]:
|
|
# Convert temps
|
|
high = self._convert_temp(period.high_temp, temp_unit) if period.high_temp else None
|
|
low = self._convert_temp(period.low_temp, temp_unit) if period.low_temp else None
|
|
temp_symbol = "°F" if temp_unit == 'fahrenheit' else "°C"
|
|
|
|
# Format day
|
|
day_abbrev = period.day_name[:3] if len(period.day_name) > 3 else period.day_name
|
|
|
|
# Build period string
|
|
period_str = f"{day_abbrev}: {period.conditions}"
|
|
if high is not None and low is not None:
|
|
period_str += f" {high}{temp_symbol}/{low}{temp_symbol}"
|
|
elif high is not None:
|
|
period_str += f" {high}{temp_symbol}"
|
|
elif low is not None:
|
|
period_str += f" {low}{temp_symbol}"
|
|
|
|
if period.precip_chance and period.precip_chance > 30:
|
|
period_str += f" {period.precip_chance}% PoP"
|
|
|
|
parts.append(period_str)
|
|
|
|
return "\n".join(parts)
|
|
|
|
def _convert_temp(self, temp_c: float, unit: str) -> float:
|
|
"""Convert temperature from Celsius to requested unit.
|
|
|
|
Args:
|
|
temp_c: Temperature in Celsius
|
|
unit: Target unit ('celsius' or 'fahrenheit')
|
|
|
|
Returns:
|
|
float: Converted temperature
|
|
"""
|
|
if unit == 'fahrenheit':
|
|
return round((temp_c * 9/5) + 32, 1)
|
|
return round(temp_c, 1)
|
|
|
|
def _convert_wind(self, wind_kph: int, unit: str) -> float:
|
|
"""Convert wind speed from km/h to requested unit.
|
|
|
|
Args:
|
|
wind_kph: Wind speed in km/h (WXSIM default unit)
|
|
unit: Target unit ('kph', 'mph', or 'ms')
|
|
|
|
Returns:
|
|
float: Converted wind speed
|
|
"""
|
|
# WXSIM outputs wind in km/h, but the values might be in different units
|
|
# depending on configuration. We'll assume km/h as default.
|
|
if unit == 'mph':
|
|
return round(wind_kph * 0.621371, 1)
|
|
elif unit == 'ms':
|
|
return round(wind_kph / 3.6, 1)
|
|
return float(wind_kph)
|
|
|
|
def _get_wind_unit_str(self, unit: str) -> str:
|
|
"""Get wind speed unit string.
|
|
|
|
Args:
|
|
unit: Wind unit ('kph', 'mph', or 'ms')
|
|
|
|
Returns:
|
|
str: Unit string
|
|
"""
|
|
unit_map = {
|
|
'kph': 'km/h',
|
|
'mph': 'mph',
|
|
'ms': 'm/s'
|
|
}
|
|
return unit_map.get(unit, 'km/h')
|
|
|
|
def _normalize_condition(self, condition: str) -> str:
|
|
"""Normalize weather condition text.
|
|
|
|
Args:
|
|
condition: Raw condition text from WXSIM
|
|
|
|
Returns:
|
|
str: Normalized condition description
|
|
"""
|
|
condition_upper = condition.strip().upper()
|
|
|
|
# Try to match abbreviations
|
|
for abbrev, full in self.WEATHER_CONDITIONS.items():
|
|
if abbrev in condition_upper:
|
|
return full
|
|
|
|
# Return original if no match
|
|
return condition.strip() if condition else "Unknown"
|
|
|
|
def get_forecast_date(self, forecast: WXSIMForecast) -> Optional[datetime]:
|
|
"""Get the forecast date as a datetime object.
|
|
|
|
Args:
|
|
forecast: Parsed forecast data
|
|
|
|
Returns:
|
|
Optional[datetime]: Forecast date/time or None if unavailable
|
|
"""
|
|
if not forecast.update_date:
|
|
return None
|
|
|
|
try:
|
|
# Parse date string like "May 5"
|
|
month_name, day_num = forecast.update_date.split()
|
|
month_map = {
|
|
'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6,
|
|
'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12
|
|
}
|
|
month = month_map.get(month_name[:3], 1)
|
|
day = int(day_num)
|
|
|
|
# Parse time if available
|
|
hour = 0
|
|
minute = 0
|
|
if forecast.update_time:
|
|
# Parse time like "7:00 A" or "12:30 P"
|
|
time_match = re.match(r'(\d{1,2}):(\d{2})\s+([AP])', forecast.update_time)
|
|
if time_match:
|
|
hour_12 = int(time_match.group(1))
|
|
minute = int(time_match.group(2))
|
|
am_pm = time_match.group(3)
|
|
|
|
# Convert to 24-hour
|
|
hour = hour_12
|
|
if am_pm == 'P' and hour_12 != 12:
|
|
hour = hour_12 + 12
|
|
elif am_pm == 'A' and hour_12 == 12:
|
|
hour = 0
|
|
|
|
# Create datetime - try current year first, but if that's in the future,
|
|
# it's likely from last year
|
|
forecast_datetime = datetime(self.current_year, month, day, hour, minute)
|
|
|
|
# If forecast date is in the future, assume it's from last year
|
|
# (WXSIM forecasts are typically generated for the current/upcoming period)
|
|
if forecast_datetime > self.current_date:
|
|
# Try previous year
|
|
forecast_datetime = datetime(self.current_year - 1, month, day, hour, minute)
|
|
# If that's also in the future (shouldn't happen), keep original
|
|
if forecast_datetime > self.current_date:
|
|
forecast_datetime = datetime(self.current_year, month, day, hour, minute)
|
|
|
|
return forecast_datetime
|
|
except (ValueError, KeyError, AttributeError):
|
|
return None
|
|
|
|
def is_forecast_stale(self, forecast: WXSIMForecast, max_age_hours: int = 48) -> Tuple[bool, Optional[str]]:
|
|
"""Check if forecast is stale (too old).
|
|
|
|
Args:
|
|
forecast: Parsed forecast data
|
|
max_age_hours: Maximum age in hours before considered stale (default: 48)
|
|
|
|
Returns:
|
|
Tuple[bool, Optional[str]]: (is_stale, reason_message)
|
|
"""
|
|
forecast_date = self.get_forecast_date(forecast)
|
|
if not forecast_date:
|
|
return True, "Could not determine forecast date"
|
|
|
|
now = datetime.now()
|
|
age = now - forecast_date
|
|
age_hours = age.total_seconds() / 3600
|
|
|
|
if age_hours > max_age_hours:
|
|
return True, f"Forecast is {age_hours:.1f} hours old (max: {max_age_hours}h)"
|
|
|
|
if age_hours < 0:
|
|
# Forecast is in the future (shouldn't happen, but handle gracefully)
|
|
return True, f"Forecast date is in the future: {forecast_date}"
|
|
|
|
return False, None
|
|
|
|
@staticmethod
|
|
def fetch_from_url(url: str, timeout: int = 10) -> Optional[str]:
|
|
"""Fetch WXSIM plaintext data from a URL.
|
|
|
|
Args:
|
|
url: URL to fetch plaintext.txt from
|
|
timeout: Request timeout in seconds
|
|
|
|
Returns:
|
|
Optional[str]: Plaintext content or None on error
|
|
"""
|
|
try:
|
|
response = requests.get(url, timeout=timeout)
|
|
response.raise_for_status()
|
|
text = response.text
|
|
# Verify it looks like WXSIM data
|
|
if 'FORECAST RUN' in text.upper() or 'DATE' in text:
|
|
return text
|
|
return None
|
|
except (requests.RequestException, ValueError, AttributeError):
|
|
return None
|