Add hourly weather forecast support to WxCommand. Updated response formatting to prioritize current conditions and additional details while ensuring display width constraints are respected. Improved observation data retrieval for more accurate current conditions.

This commit is contained in:
agessaman
2025-11-22 12:13:42 -08:00
parent 6b23f977f3
commit 9d2d0e325c
+520 -88
View File
@@ -126,6 +126,9 @@ class WxCommand(BaseCommand):
if last_part == "tomorrow":
forecast_type = "tomorrow"
location_parts = location_parts[:-1]
elif last_part == "hourly":
forecast_type = "hourly"
location_parts = location_parts[:-1]
elif last_part.isdigit():
# Check if it's a number between 2-7
days = int(last_part)
@@ -196,7 +199,7 @@ class WxCommand(BaseCommand):
Args:
location: The location (zipcode or city name)
location_type: "zipcode" or "city"
forecast_type: "default", "tomorrow", or "multiday"
forecast_type: "default", "tomorrow", "multiday", or "hourly"
num_days: Number of days for multiday forecast (2-7)
"""
try:
@@ -275,15 +278,18 @@ class WxCommand(BaseCommand):
if forecast_periods == self.ERROR_FETCHING_DATA:
return self.translate('commands.wx.error_fetching')
weather = self.format_multiday_forecast(forecast_periods, num_days)
elif forecast_type == "hourly":
hourly_periods, points_data = self.get_noaa_hourly_weather(lat, lon)
if hourly_periods == self.ERROR_FETCHING_DATA:
return self.translate('commands.wx.error_fetching')
weather = self.format_hourly_forecast(hourly_periods)
else: # default
weather, points_data = self.get_noaa_weather(lat, lon)
if weather == self.ERROR_FETCHING_DATA:
return self.translate('commands.wx.error_fetching')
# Try to get additional current conditions data
current_conditions = self.get_current_conditions(points_data)
if current_conditions and self._count_display_width(weather) < 120:
weather = f"{weather} {current_conditions}"
# Note: Current conditions are now integrated directly into the current period
# via _add_period_details() using observation station data
# Get weather alerts (only for default forecast type to avoid cluttering)
if forecast_type == "default":
@@ -491,7 +497,6 @@ class WxCommand(BaseCommand):
# Add wind info if available
if wind_speed and wind_direction:
import re
wind_match = re.search(r'(\d+)', wind_speed)
if wind_match:
wind_num = wind_match.group(1)
@@ -499,53 +504,63 @@ class WxCommand(BaseCommand):
if wind_dir:
weather += f" {wind_dir}{wind_num}"
# Add humidity if available and space allows (using display width)
if humidity and self._count_display_width(weather) < 90:
weather += f" {humidity}%RH"
# PRIORITIZE: Add all available details to current period first
# Get observation station data for more accurate current conditions
observation_data = self.get_observation_data(weather_json)
# Add precipitation chance if available and space allows
if precip_chance and self._count_display_width(weather) < 100:
# Use most of the 130 char limit (120 chars) to ensure current period gets full details
# Additional periods will only be added if there's remaining space
# Pass observation_data to use real-time station data instead of parsing from text
weather = self._add_period_details(weather, detailed_forecast, 0, max_length=120, observation_data=observation_data)
# Also add precipitation chance if available (not in helper function)
if precip_chance and self._count_display_width(weather) < 120:
weather += f" 🌦️{precip_chance}%"
# Add UV index if available and space allows
# Also add UV index if available (not in helper function)
uv_index = self.extract_uv_index(detailed_forecast)
if uv_index and self._count_display_width(weather) < 110:
if uv_index and self._count_display_width(weather) < 120:
weather += f" UV{uv_index}"
# Add dew point if available and space allows
dew_point = self.extract_dew_point(detailed_forecast)
if dew_point and self._count_display_width(weather) < 120:
weather += f" 💧{dew_point}°"
# Add visibility if available and space allows
visibility = self.extract_visibility(detailed_forecast)
if visibility and self._count_display_width(weather) < 130:
weather += f" 👁️{visibility}mi"
# Add precipitation probability if available and space allows
precip_prob = self.extract_precip_probability(detailed_forecast)
if precip_prob and self._count_display_width(weather) < 140:
weather += f" 🌦️{precip_prob}%"
# Add wind gusts if available and space allows
wind_gusts = self.extract_wind_gusts(detailed_forecast)
if wind_gusts and self._count_display_width(weather) < 140:
weather += f" 💨{wind_gusts}"
# Add next period (Tonight) and Tomorrow if available
# First, find Tonight and Tomorrow periods
# Add next period (Today, Tonight) and Tomorrow if available
# First, find Today, Tonight, and Tomorrow periods
today_period = None
tonight_period = None
tomorrow_period = None
current_period_name = current.get('name', '').lower()
is_current_tonight = 'tonight' in current_period_name
is_current_night = any(word in current_period_name for word in ['tonight', 'overnight', 'night'])
# Check if current period is a night period (Overnight, Tonight, etc.)
# If so, we should prioritize showing the upcoming daytime period (Today)
for i, period in enumerate(forecast):
period_name = period.get('name', '').lower()
if 'tonight' in period_name and tonight_period is None:
# Look for "Today" period (daytime forecast)
if 'today' in period_name and today_period is None and i > 0:
# Make sure it's not a night period
if 'night' not in period_name and 'tonight' not in period_name:
today_period = (i, period)
elif 'tonight' in period_name and tonight_period is None:
tonight_period = (i, period)
elif 'tomorrow' in period_name and tomorrow_period is None:
tomorrow_period = (i, period)
# If current is a night period and we haven't found Today yet, look for next daytime period
if is_current_night and not today_period:
# Look for the next period that's not a night period
for i, period in enumerate(forecast):
if i > 0: # Skip current period
period_name = period.get('name', '').lower()
# Look for daytime periods (Today, or day names without "night")
if 'today' in period_name and 'night' not in period_name:
today_period = (i, period)
break
# Also check for day names that aren't night periods
day_names = ['monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday']
if any(day in period_name for day in day_names) and 'night' not in period_name:
today_period = (i, period)
break
# If current is Tonight and we haven't found Tomorrow yet, look for next day's periods
if is_current_tonight and not tomorrow_period:
# Look for periods after Tonight (next day)
@@ -557,10 +572,10 @@ class WxCommand(BaseCommand):
tomorrow_period = (i, period)
break
# Add Tonight if it's the immediate next period (and current is not already Tonight)
if tonight_period and tonight_period[0] == 1 and not is_current_tonight:
period = tonight_period[1]
period_name = self.abbreviate_noaa(period.get('name', 'Tonight'))
# If current is a night period, prioritize adding Today (the upcoming daytime)
if is_current_night and today_period:
period = today_period[1]
period_name = self.abbreviate_noaa(period.get('name', 'Today'))
period_temp = period.get('temperature', '')
period_short = period.get('shortForecast', '')
period_detailed = period.get('detailedForecast', '')
@@ -581,7 +596,6 @@ class WxCommand(BaseCommand):
if period_wind_speed and period_wind_direction:
test_str = weather + period_str
if self._count_display_width(test_str) < 120:
import re
wind_match = re.search(r'(\d+)', period_wind_speed)
if wind_match:
wind_num = wind_match.group(1)
@@ -591,8 +605,75 @@ class WxCommand(BaseCommand):
if self._count_display_width(test_str + wind_info) <= 130:
period_str += wind_info
# Add additional details (humidity, dew point, visibility, etc.)
# But only if current period isn't too long - prioritize current period details
current_weather_len = self._count_display_width(weather)
# Only add details to additional periods if current period is under 110 chars
# This ensures we prioritize current period details first
if current_weather_len < 110:
period_str = self._add_period_details(period_str, period_detailed, current_weather_len, max_length=130)
# Only add if we have space (using display width)
if self._count_display_width(weather + period_str) <= 130: # Leave room for alerts
# Be more conservative - only add if current period is reasonable length
if current_weather_len < 110 and self._count_display_width(weather + period_str) <= 130:
weather += period_str
# Add Tonight if it's the immediate next period (and current is not already Tonight)
# If we already added Today, we can still add Tonight if it's the next period after Today
if tonight_period and not is_current_tonight:
# Only add if it's the immediate next period, or if current is night and we haven't added Today yet
should_add_tonight = False
if is_current_night and today_period:
# If current is night and we added Today, check if Tonight comes after Today
if tonight_period[0] > today_period[0]:
should_add_tonight = True
elif tonight_period[0] == 1:
# If current is not night, Tonight should be the immediate next period
should_add_tonight = True
if should_add_tonight:
period = tonight_period[1]
period_name = self.abbreviate_noaa(period.get('name', 'Tonight'))
period_temp = period.get('temperature', '')
period_short = period.get('shortForecast', '')
period_detailed = period.get('detailedForecast', '')
period_wind_speed = period.get('windSpeed', '')
period_wind_direction = period.get('windDirection', '')
if period_temp and period_short:
# Try to get high/low
period_high_low = self.extract_high_low(period_detailed)
period_emoji = self.get_weather_emoji(period_short)
if period_high_low:
period_str = f" | {period_name}: {period_emoji}{period_short} {period_high_low}"
else:
period_str = f" | {period_name}: {period_emoji}{period_short} {period_temp}°"
# Add wind info if space allows (using display width)
if period_wind_speed and period_wind_direction:
test_str = weather + period_str
if self._count_display_width(test_str) < 120:
wind_match = re.search(r'(\d+)', period_wind_speed)
if wind_match:
wind_num = wind_match.group(1)
wind_dir = self.abbreviate_wind_direction(period_wind_direction)
if wind_dir:
wind_info = f" {wind_dir}{wind_num}"
if self._count_display_width(test_str + wind_info) <= 130:
period_str += wind_info
# Add additional details (humidity, dew point, visibility, etc.)
# But only if current period isn't too long - prioritize current period details
current_weather_len = self._count_display_width(weather)
# Only add details to additional periods if current period is under 110 chars
# This ensures we prioritize current period details first
if current_weather_len < 110:
period_str = self._add_period_details(period_str, period_detailed, current_weather_len, max_length=130)
# Only add if we have space (using display width)
# Be more conservative - only add if current period is reasonable length
if current_weather_len < 110 and self._count_display_width(weather + period_str) <= 130:
weather += period_str
# Always try to add Tomorrow if available (especially if current is Tonight)
@@ -610,9 +691,9 @@ class WxCommand(BaseCommand):
# Try to get high/low for tomorrow
period_high_low = self.extract_high_low(period_detailed)
# Abbreviate forecast text if it's too long (especially when current is Tonight)
# Abbreviate forecast text if it's too long (especially when current is a night period)
abbreviated_forecast = period_short
if is_current_tonight and len(period_short) > 20:
if (is_current_tonight or is_current_night) and len(period_short) > 20:
# Try to shorten forecast text to fit more info
# Remove transitional words and keep meaningful conditions
words = period_short.split()
@@ -654,12 +735,11 @@ class WxCommand(BaseCommand):
period_str = f" | {period_name}: {period_emoji}{abbreviated_forecast} {period_temp}°"
# Add wind info if space allows (using display width)
# Be more aggressive about adding wind when current is Tonight
wind_threshold = 115 if is_current_tonight else 120
# Be more aggressive about adding wind when current is a night period
wind_threshold = 115 if (is_current_tonight or is_current_night) else 120
if period_wind_speed and period_wind_direction:
test_str = weather + period_str
if self._count_display_width(test_str) < wind_threshold:
import re
wind_match = re.search(r'(\d+)', period_wind_speed)
if wind_match:
wind_num = wind_match.group(1)
@@ -669,10 +749,19 @@ class WxCommand(BaseCommand):
if self._count_display_width(test_str + wind_info) <= 130:
period_str += wind_info
# Only add if we have space (using display width, prioritize tomorrow)
# Be more aggressive when current is Tonight - use up to 128 chars (leave 2 for alerts)
max_chars = 128 if is_current_tonight else 130
if self._count_display_width(weather + period_str) <= max_chars:
# Add additional details (humidity, dew point, visibility, etc.)
# But only if current period isn't too long - prioritize current period details
current_weather_len = self._count_display_width(weather)
# Only add details to additional periods if current period is under 110 chars
# This ensures we prioritize current period details first
if current_weather_len < 110:
max_chars = 128 if (is_current_tonight or is_current_night) else 130
period_str = self._add_period_details(period_str, period_detailed, current_weather_len, max_chars)
# Only add if we have space (using display width, prioritize current period)
# Be more conservative - only add if current period is reasonable length
max_chars = 128 if (is_current_tonight or is_current_night) else 130
if current_weather_len < 110 and self._count_display_width(weather + period_str) <= max_chars:
weather += period_str
return weather, weather_json
@@ -681,6 +770,203 @@ class WxCommand(BaseCommand):
self.logger.error(f"Error fetching NOAA weather: {e}")
return self.ERROR_FETCHING_DATA, None
def get_noaa_hourly_weather(self, lat: float, lon: float) -> tuple:
"""Get hourly weather forecast from NOAA
Args:
lat: Latitude
lon: Longitude
Returns:
Tuple of (hourly_periods_list, points_data)
"""
try:
# Get weather data from NOAA
weather_api = f"https://api.weather.gov/points/{lat},{lon}"
# Get the forecast URL
weather_data = requests.get(weather_api, timeout=self.url_timeout)
if not weather_data.ok:
self.logger.warning("Error fetching weather data from NOAA")
return self.ERROR_FETCHING_DATA, None
weather_json = weather_data.json()
hourly_forecast_url = weather_json['properties'].get('forecastHourly')
if not hourly_forecast_url:
self.logger.warning("Hourly forecast not available for this location")
return self.ERROR_FETCHING_DATA, None
# Get the hourly forecast
hourly_data = requests.get(hourly_forecast_url, timeout=self.url_timeout)
if not hourly_data.ok:
self.logger.warning("Error fetching hourly forecast from NOAA")
return self.ERROR_FETCHING_DATA, None
hourly_json = hourly_data.json()
hourly_periods = hourly_json['properties']['periods']
if not hourly_periods:
self.logger.warning("No hourly periods returned from NOAA")
return self.ERROR_FETCHING_DATA, None
return hourly_periods, weather_json
except Exception as e:
self.logger.error(f"Error fetching NOAA hourly weather: {e}")
return self.ERROR_FETCHING_DATA, None
def format_hourly_forecast(self, hourly_periods: list) -> str:
"""Format hourly forecast to fit as many hours as possible in 130 chars
Args:
hourly_periods: List of hourly forecast periods from NOAA
Returns:
Formatted string with one hour per line
"""
try:
if not hourly_periods:
return self.translate('commands.wx.hourly_not_available')
lines = []
current_length = 0
max_length = 130
# Filter to only future hours
now = datetime.now()
future_periods = []
for period in hourly_periods:
start_time_str = period.get('startTime', '')
if start_time_str:
try:
# Parse ISO format with timezone
if 'Z' in start_time_str:
start_time = datetime.fromisoformat(start_time_str.replace('Z', '+00:00'))
else:
start_time = datetime.fromisoformat(start_time_str)
# Convert to local timezone if needed
if start_time.tzinfo:
# Make naive for comparison
start_time = start_time.replace(tzinfo=None)
if start_time > now:
future_periods.append(period)
except (ValueError, TypeError):
# If parsing fails, include it anyway
future_periods.append(period)
else:
# If no startTime, include it
future_periods.append(period)
if not future_periods:
return "No future hourly periods available"
# Format each hour
for period in future_periods:
start_time_str = period.get('startTime', '')
temp = period.get('temperature', '')
temp_unit = period.get('temperatureUnit', 'F')
short_forecast = period.get('shortForecast', '')
wind_speed = period.get('windSpeed', '')
wind_direction = period.get('windDirection', '')
precip_prob = period.get('probabilityOfPrecipitation', {}).get('value')
# Format time (e.g., "2PM", "10AM")
time_str = ""
if start_time_str:
try:
# Parse ISO format - handle timezone
if 'Z' in start_time_str:
dt = datetime.fromisoformat(start_time_str.replace('Z', '+00:00'))
elif '+' in start_time_str or start_time_str.count('-') > 2:
# Has timezone info
dt = datetime.fromisoformat(start_time_str)
else:
# No timezone, parse as naive
dt = datetime.fromisoformat(start_time_str)
# Extract hour (assume it's already in local time or close enough)
hour = dt.hour
# Format as 12-hour time
if hour == 0:
time_str = "12AM"
elif hour < 12:
time_str = f"{hour}AM"
elif hour == 12:
time_str = "12PM"
else:
time_str = f"{hour-12}PM"
except (ValueError, TypeError):
time_str = ""
# Build hour line: "10AM: 🌦️ 26% Chance Light Rain 49° SS5"
emoji = self.get_weather_emoji(short_forecast)
# Abbreviate forecast if too long
forecast_short = short_forecast
if len(forecast_short) > 18:
# Take first 2-3 words
words = forecast_short.split()
if len(words) > 3:
forecast_short = ' '.join(words[:3])
else:
forecast_short = forecast_short[:18]
# Build the line - format: "10AM: 🌦️ 26% Chance Light Rain 49° SS5"
line_parts = []
if time_str:
line_parts.append(f"{time_str}:")
# Add emoji
line_parts.append(emoji)
# Add precip probability if > 0% (before forecast text)
if precip_prob is not None and precip_prob > 0:
line_parts.append(f"{precip_prob}%")
# Add forecast text
line_parts.append(forecast_short)
# Add temperature
if temp:
line_parts.append(f"{temp}°")
# Add wind if available (use compact format)
if wind_speed and wind_direction:
wind_match = re.search(r'(\d+)', wind_speed)
if wind_match:
wind_num = wind_match.group(1)
# Get direction abbreviation (first 1-2 chars)
wind_dir_abbrev = wind_direction[:2] if len(wind_direction) >= 2 else wind_direction
# Remove any spaces and make uppercase
wind_dir_abbrev = wind_dir_abbrev.replace(' ', '').upper()
line_parts.append(f"{wind_dir_abbrev}{wind_num}")
line = " ".join(line_parts)
# Check if adding this line would exceed limit
test_lines = lines + [line]
test_message = "\n".join(test_lines)
test_length = self._count_display_width(test_message)
if test_length <= max_length:
lines.append(line)
else:
# This line would exceed limit, stop here
break
if not lines:
return "Hourly forecast not available"
return "\n".join(lines)
except Exception as e:
self.logger.error(f"Error formatting hourly forecast: {e}")
return f"Error formatting hourly forecast: {str(e)}"
def format_tomorrow_forecast(self, forecast: list) -> str:
"""Format a detailed forecast for tomorrow"""
try:
@@ -749,7 +1035,6 @@ class WxCommand(BaseCommand):
# Add wind info
if wind_speed and wind_direction:
import re
wind_match = re.search(r'(\d+)', wind_speed)
if wind_match:
wind_num = wind_match.group(1)
@@ -909,9 +1194,97 @@ class WxCommand(BaseCommand):
self.logger.error(f"Error formatting {num_days}-day forecast: {e}")
return self.translate('commands.wx.multiday_error', num_days=num_days)
def _add_period_details(self, period_str: str, detailed_forecast: str, current_weather_length: int, max_length: int = 130, observation_data: dict = None) -> str:
"""Add additional details (humidity, dew point, visibility, etc.) to a period string
Args:
period_str: The base period string (e.g., " | Today: ☀️Sunny 75°")
detailed_forecast: The detailed forecast text to extract info from
current_weather_length: Current length of the weather string (to check total length)
max_length: Maximum total length allowed (default 130)
observation_data: Optional dict with observation station data (humidity, dew_point, visibility, wind_gusts, pressure)
Returns:
Updated period string with additional details if space allows
"""
result = period_str
current_length = current_weather_length + self._count_display_width(result)
# Extract additional details - prefer observation data if available (more accurate)
if observation_data:
humidity = observation_data.get('humidity')
dew_point = observation_data.get('dew_point')
visibility = observation_data.get('visibility')
wind_gusts = observation_data.get('wind_gusts')
pressure = observation_data.get('pressure')
else:
humidity = None
dew_point = None
visibility = None
wind_gusts = None
pressure = None
# Fall back to parsing from detailed forecast if observation data not available
if not humidity:
humidity = self.extract_humidity(detailed_forecast)
if not dew_point:
dew_point = self.extract_dew_point(detailed_forecast)
if not visibility:
visibility = self.extract_visibility(detailed_forecast)
if not wind_gusts:
wind_gusts = self.extract_wind_gusts(detailed_forecast)
if not pressure:
pressure = self.extract_pressure(detailed_forecast)
# Always try to get precip_prob from detailed forecast (not in observation data)
precip_prob = self.extract_precip_probability(detailed_forecast)
# Add humidity if available and space allows
# Try to add all available details, only skip if they would exceed max_length
if humidity:
humidity_str = f" {humidity}%RH"
if self._count_display_width(result + humidity_str) + current_weather_length <= max_length:
result += humidity_str
current_length = current_weather_length + self._count_display_width(result)
# Add dew point if available and space allows
if dew_point:
dew_str = f" 💧{dew_point}°"
if self._count_display_width(result + dew_str) + current_weather_length <= max_length:
result += dew_str
current_length = current_weather_length + self._count_display_width(result)
# Add visibility if available and space allows
if visibility:
vis_str = f" 👁️{visibility}mi"
if self._count_display_width(result + vis_str) + current_weather_length <= max_length:
result += vis_str
current_length = current_weather_length + self._count_display_width(result)
# Add precipitation probability if available and space allows
if precip_prob:
precip_str = f" 🌦️{precip_prob}%"
if self._count_display_width(result + precip_str) + current_weather_length <= max_length:
result += precip_str
current_length = current_weather_length + self._count_display_width(result)
# Add wind gusts if available and space allows
if wind_gusts:
gust_str = f" 💨{wind_gusts}"
if self._count_display_width(result + gust_str) + current_weather_length <= max_length:
result += gust_str
current_length = current_weather_length + self._count_display_width(result)
# Add pressure if available and space allows
if pressure:
pressure_str = f" 📊{pressure}hPa"
if self._count_display_width(result + pressure_str) + current_weather_length <= max_length:
result += pressure_str
return result
def _count_display_width(self, text: str) -> int:
"""Count display width of text, accounting for emojis which may take 2 display units"""
import re
# Count regular characters
width = len(text)
# Emojis typically take 2 display units in terminals/clients
@@ -1127,7 +1500,6 @@ class WxCommand(BaseCommand):
if not text:
return ""
import re
# Look for patterns like "humidity 45%" or "45% humidity"
humidity_patterns = [
r'humidity\s+(\d+)%',
@@ -1148,7 +1520,6 @@ class WxCommand(BaseCommand):
if not text:
return ""
import re
# Look for patterns like "20% chance" or "chance of rain 30%"
precip_patterns = [
r'(\d+)%\s+chance',
@@ -1169,7 +1540,6 @@ class WxCommand(BaseCommand):
if not text:
return ""
import re
# Look for more specific patterns to avoid false matches
high_low_patterns = [
r'high\s+near\s+(\d+).*?low\s+around\s+(\d+)',
@@ -1211,7 +1581,6 @@ class WxCommand(BaseCommand):
if not text:
return ""
import re
# Look for UV index patterns
uv_patterns = [
r'uv\s+index\s+(\d+)',
@@ -1237,7 +1606,6 @@ class WxCommand(BaseCommand):
if not text:
return ""
import re
# Look for dew point patterns
dew_point_patterns = [
r'dew point\s+(\d+)',
@@ -1263,7 +1631,6 @@ class WxCommand(BaseCommand):
if not text:
return ""
import re
# Look for visibility patterns
visibility_patterns = [
r'visibility\s+(\d+)\s+miles',
@@ -1290,7 +1657,6 @@ class WxCommand(BaseCommand):
if not text:
return ""
import re
# Look for precipitation probability patterns
precip_prob_patterns = [
r'(\d+)%\s+chance\s+of\s+(?:rain|precipitation|showers)',
@@ -1319,7 +1685,6 @@ class WxCommand(BaseCommand):
if not text:
return ""
import re
# Look for wind gust patterns
gust_patterns = [
r'gusts\s+to\s+(\d+)\s+mph',
@@ -1342,26 +1707,61 @@ class WxCommand(BaseCommand):
continue
return ""
def extract_pressure(self, text: str) -> str:
"""Extract barometric pressure from forecast text"""
if not text:
return ""
# Look for pressure patterns (hPa, mb, inches of mercury)
pressure_patterns = [
r'pressure\s+(\d+)\s*hpa',
r'pressure\s+(\d+)\s*mb',
r'barometric\s+pressure\s+(\d+)\s*hpa',
r'barometric\s+pressure\s+(\d+)\s*mb',
r'(\d+)\s*hpa',
r'(\d+)\s*mb\s+pressure'
]
for pattern in pressure_patterns:
match = re.search(pattern, text.lower())
if match:
pressure_val = match.group(1)
# Validate pressure (reasonable range 600-1100 hPa/mb)
# Normal sea level is ~1013 hPa, but high elevation locations can be lower
try:
pressure_int = int(pressure_val)
if 600 <= pressure_int <= 1100:
return pressure_val
except ValueError:
continue
return ""
def get_current_conditions(self, points_data: dict) -> str:
"""Get additional current conditions data from NOAA using existing points data"""
def get_observation_data(self, points_data: dict) -> dict:
"""Get observation station data from NOAA and return as a dict
Returns:
Dict with keys: humidity, dew_point, visibility, wind_gusts, pressure
Values are strings ready for display, or None if not available
"""
try:
if not points_data:
return ""
return {}
weather_json = points_data
station_url = weather_json['properties'].get('observationStations')
if not station_url:
return ""
return {}
# Get the nearest station
stations_data = requests.get(station_url, timeout=self.url_timeout)
if not stations_data.ok:
return ""
return {}
stations_json = stations_data.json()
if not stations_json.get('features'):
return ""
return {}
# Get current observations from the nearest station
station_id = stations_json['features'][0]['properties']['stationIdentifier']
@@ -1369,43 +1769,75 @@ class WxCommand(BaseCommand):
obs_data = requests.get(obs_url, timeout=self.url_timeout)
if not obs_data.ok:
return ""
return {}
obs_json = obs_data.json()
if not obs_json.get('properties'):
return ""
return {}
props = obs_json['properties']
conditions = []
obs_data_dict = {}
# Extract useful current conditions with emojis
if props.get('relativeHumidity', {}).get('value'):
humidity = int(props['relativeHumidity']['value'])
conditions.append(f"{humidity}%RH")
# Extract useful current conditions
# Check for None explicitly to handle cases where value exists but is None
humidity_val = props.get('relativeHumidity', {}).get('value')
if humidity_val is not None:
humidity = int(humidity_val)
obs_data_dict['humidity'] = str(humidity)
if props.get('dewpoint', {}).get('value'):
dewpoint = int(props['dewpoint']['value'] * 9/5 + 32) # Convert C to F
conditions.append(f"💧{dewpoint}°")
dewpoint_val = props.get('dewpoint', {}).get('value')
if dewpoint_val is not None:
dewpoint = int(dewpoint_val * 9/5 + 32) # Convert C to F
obs_data_dict['dew_point'] = str(dewpoint)
if props.get('visibility', {}).get('value'):
visibility = int(props['visibility']['value'] * 0.000621371) # Convert m to miles
visibility_val = props.get('visibility', {}).get('value')
if visibility_val is not None:
visibility = int(visibility_val * 0.000621371) # Convert m to miles
if visibility > 0:
conditions.append(f"👁️{visibility}mi")
obs_data_dict['visibility'] = str(visibility)
if props.get('windGust', {}).get('value'):
wind_gust = int(props['windGust']['value'] * 2.237) # Convert m/s to mph
wind_gust_val = props.get('windGust', {}).get('value')
if wind_gust_val is not None:
wind_gust = int(wind_gust_val * 2.237) # Convert m/s to mph
if wind_gust > 10:
conditions.append(f"💨{wind_gust}")
obs_data_dict['wind_gusts'] = str(wind_gust)
if props.get('barometricPressure', {}).get('value'):
pressure = int(props['barometricPressure']['value'] / 100) # Convert Pa to hPa
conditions.append(f"📊{pressure}hPa")
pressure_val = props.get('barometricPressure', {}).get('value')
if pressure_val is not None:
pressure = int(pressure_val / 100) # Convert Pa to hPa
obs_data_dict['pressure'] = str(pressure)
return " ".join(conditions[:3]) # Limit to 3 conditions to avoid overflow
return obs_data_dict
except Exception as e:
self.logger.debug(f"Error getting current conditions: {e}")
self.logger.debug(f"Error getting observation data: {e}")
return {}
def get_current_conditions(self, points_data: dict) -> str:
"""Get additional current conditions data from NOAA using existing points data (legacy method)"""
obs_data = self.get_observation_data(points_data)
if not obs_data:
return ""
conditions = []
# Build conditions list in priority order
if 'humidity' in obs_data:
conditions.append(f"{obs_data['humidity']}%RH")
if 'dew_point' in obs_data:
conditions.append(f"💧{obs_data['dew_point']}°")
if 'visibility' in obs_data:
conditions.append(f"👁️{obs_data['visibility']}mi")
if 'wind_gusts' in obs_data:
conditions.append(f"💨{obs_data['wind_gusts']}")
if 'pressure' in obs_data:
conditions.append(f"📊{obs_data['pressure']}hPa")
return " ".join(conditions[:3]) # Limit to 3 conditions to avoid overflow
def get_weather_emoji(self, condition: str) -> str:
"""Get emoji for weather condition"""