mirror of
https://github.com/agessaman/meshcore-bot.git
synced 2026-06-04 06:41:21 +00:00
8f04d8ceb6
Extracts canonical path-parsing and packet-hashing logic from modules/utils.py into shared/parsers/path_parser.py and shared/parsers/packet_parser.py, eliminating duplication across three prior implementations. All callers updated to import from shared.parsers.*; modules/utils.py re-exports via shared imports for any remaining internal callers. 2866 tests pass. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
820 lines
34 KiB
Python
820 lines
34 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Utility functions for the MeshCore Bot
|
|
Shared helper functions used across multiple modules
|
|
"""
|
|
|
|
import asyncio
|
|
import re
|
|
import socket
|
|
import urllib.error
|
|
import urllib.request
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any, Optional, Union
|
|
|
|
from shared.geocoding import calculate_distance
|
|
from shared.parsers.packet_parser import calculate_packet_hash, verify_meshcore_advert_ed25519
|
|
from shared.parsers.path_parser import (
|
|
_HEX_BYTE_TOKEN,
|
|
_normalized_message_path_string,
|
|
bytes_per_hop_from_routing_and_nodes,
|
|
decode_path_len_byte,
|
|
encode_path_len_byte,
|
|
extract_path_node_ids_from_message,
|
|
message_path_bytes_per_hop,
|
|
node_ids_from_path_string,
|
|
parse_path_string,
|
|
parse_trace_payload_route_hashes,
|
|
)
|
|
from shared.text_utils import format_elapsed_display
|
|
|
|
try:
|
|
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
|
|
except ImportError:
|
|
ZoneInfo = None # type: ignore[misc, assignment]
|
|
ZoneInfoNotFoundError = Exception # type: ignore[misc, assignment]
|
|
|
|
|
|
def is_valid_timezone(tz_str: str) -> bool:
|
|
"""Return True if the string is a valid IANA timezone name."""
|
|
if not (tz_str and tz_str.strip()):
|
|
return False
|
|
if ZoneInfo is not None:
|
|
try:
|
|
ZoneInfo(tz_str.strip())
|
|
return True
|
|
except ZoneInfoNotFoundError:
|
|
return False
|
|
try:
|
|
pytz = __import__("pytz")
|
|
pytz.timezone(tz_str.strip())
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def get_config_timezone(config: Any, logger: Optional[Any] = None) -> tuple[Any, str]:
|
|
"""Resolve [Bot] timezone from config; fall back to system timezone if invalid or empty.
|
|
|
|
Returns:
|
|
(tz, iana_str): tz is a timezone object for datetime; iana_str is an IANA
|
|
string for APIs (e.g. OpenMeteo). When falling back to system, iana_str is "UTC".
|
|
"""
|
|
timezone_str = (config.get('Bot', 'timezone', fallback='') or '').strip()
|
|
if timezone_str and is_valid_timezone(timezone_str):
|
|
pytz = __import__("pytz")
|
|
return (pytz.timezone(timezone_str), timezone_str)
|
|
if timezone_str and logger:
|
|
logger.warning("Invalid timezone '%s', using system timezone", timezone_str)
|
|
# System timezone for datetime; use "UTC" for API when we don't have an IANA name
|
|
tz = datetime.now().astimezone().tzinfo
|
|
return (tz, "UTC")
|
|
|
|
|
|
def format_temperature_high_low(
|
|
config: Any,
|
|
high: Optional[Union[int, float]],
|
|
low: Optional[Union[int, float]],
|
|
units_str: str,
|
|
logger: Optional[Any] = None,
|
|
) -> str:
|
|
"""Format a daily high/low pair (or single value) using [Weather] templates.
|
|
|
|
Config keys (optional; defaults match prior bot behavior):
|
|
temperature_high_low_format — both values: {high}, {low}, {units}
|
|
temperature_high_only_format — {high}, {units}
|
|
temperature_low_only_format — {low}, {units}
|
|
"""
|
|
section = "Weather"
|
|
default_pair = "H:{high}{units} L:{low}{units}"
|
|
default_high_only = "H:{high}{units}"
|
|
default_low_only = "L:{low}{units}"
|
|
|
|
def _norm(v: Optional[Union[int, float]]) -> Optional[int]:
|
|
if v is None:
|
|
return None
|
|
try:
|
|
if isinstance(v, float):
|
|
return int(round(v))
|
|
return int(v)
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
hi = _norm(high)
|
|
lo = _norm(low)
|
|
if hi is None and lo is None:
|
|
return ""
|
|
|
|
if config.has_section(section):
|
|
pair_fmt = config.get(section, "temperature_high_low_format", fallback=default_pair)
|
|
high_only_fmt = config.get(section, "temperature_high_only_format", fallback=default_high_only)
|
|
low_only_fmt = config.get(section, "temperature_low_only_format", fallback=default_low_only)
|
|
else:
|
|
pair_fmt, high_only_fmt, low_only_fmt = default_pair, default_high_only, default_low_only
|
|
|
|
def _try_format(fmt: str, **kwargs: Any) -> Optional[str]:
|
|
try:
|
|
return fmt.format(**kwargs)
|
|
except (KeyError, ValueError, IndexError) as e:
|
|
if logger is not None and hasattr(logger, "warning"):
|
|
logger.warning("Invalid temperature format template %r: %s", fmt, e)
|
|
return None
|
|
|
|
if hi is not None and lo is not None:
|
|
out = _try_format(pair_fmt, high=hi, low=lo, units=units_str)
|
|
if out is not None:
|
|
return out
|
|
return _try_format(default_pair, high=hi, low=lo, units=units_str) or f"H:{hi}{units_str} L:{lo}{units_str}"
|
|
|
|
if hi is not None:
|
|
out = _try_format(high_only_fmt, high=hi, low=lo, units=units_str)
|
|
if out is not None:
|
|
return out
|
|
return _try_format(default_high_only, high=hi, low=lo, units=units_str) or f"H:{hi}{units_str}"
|
|
|
|
out = _try_format(low_only_fmt, high=hi, low=lo, units=units_str)
|
|
if out is not None:
|
|
return out
|
|
return _try_format(default_low_only, high=hi, low=lo, units=units_str) or f"L:{lo}{units_str}"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def resolve_path(file_path: Union[str, Path], base_dir: Union[str, Path] = '.') -> str:
|
|
"""Resolve a file path relative to a base directory.
|
|
|
|
If the path is absolute, it is returned as-is (no symlink/canonical resolution).
|
|
If the path is relative, it is resolved relative to the base directory.
|
|
|
|
Args:
|
|
file_path: Path to resolve (can be string or Path object).
|
|
base_dir: Base directory for resolving relative paths (default: current directory).
|
|
|
|
Returns:
|
|
str: Resolved absolute path as a string.
|
|
|
|
Examples:
|
|
>>> resolve_path('data.db', '/opt/bot')
|
|
'/opt/bot/data.db'
|
|
>>> resolve_path('/var/lib/bot/data.db', '/opt/bot')
|
|
'/var/lib/bot/data.db'
|
|
"""
|
|
file_path = Path(file_path) if not isinstance(file_path, Path) else file_path
|
|
base_dir = Path(base_dir) if not isinstance(base_dir, Path) else base_dir
|
|
|
|
if file_path.is_absolute():
|
|
# Important on macOS: `/var/...` may be a symlink to `/private/var/...`.
|
|
# Tests (and callers) expect the absolute path string to stay stable.
|
|
return str(file_path)
|
|
else:
|
|
return str((base_dir.resolve() / file_path).resolve())
|
|
|
|
|
|
def check_internet_connectivity(host: str = "8.8.8.8", port: int = 53, timeout: float = 3.0) -> bool:
|
|
"""Check if internet connectivity is available by attempting to connect to a reliable host.
|
|
|
|
First tries a lightweight DNS port check (faster, doesn't require DNS resolution).
|
|
If that fails (e.g., DNS port is blocked), falls back to an HTTP request check.
|
|
|
|
Args:
|
|
host: Host to connect to (default: 8.8.8.8, Google's public DNS).
|
|
port: Port to connect to (default: 53, DNS port).
|
|
timeout: Connection timeout in seconds (default: 3.0).
|
|
|
|
Returns:
|
|
bool: True if connection successful, False otherwise.
|
|
"""
|
|
# First try: DNS port check (fastest, works if DNS port is open)
|
|
try:
|
|
socket.setdefaulttimeout(timeout)
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock.connect((host, port))
|
|
sock.close()
|
|
socket.setdefaulttimeout(None) # Reset to default
|
|
return True
|
|
except (OSError, socket.timeout):
|
|
socket.setdefaulttimeout(None) # Reset to default
|
|
# DNS check failed, try HTTP fallback
|
|
pass
|
|
|
|
# Fallback: HTTP request check (works even if DNS port is blocked)
|
|
try:
|
|
# Use a reliable HTTP endpoint that's likely to be accessible
|
|
# Using IP address to avoid DNS resolution issues
|
|
http_url = "http://1.1.1.1" # Cloudflare DNS
|
|
urllib.request.urlopen(http_url, timeout=timeout).close()
|
|
return True
|
|
except (urllib.error.URLError, OSError, socket.timeout):
|
|
# If IP-based check fails, try a hostname-based check
|
|
try:
|
|
http_url = "http://www.google.com"
|
|
urllib.request.urlopen(http_url, timeout=timeout).close()
|
|
return True
|
|
except (urllib.error.URLError, OSError, socket.timeout):
|
|
return False
|
|
|
|
|
|
async def check_internet_connectivity_async(host: str = "8.8.8.8", port: int = 53, timeout: float = 3.0) -> bool:
|
|
"""Async version of check_internet_connectivity.
|
|
|
|
First tries a lightweight DNS port check (faster, doesn't require DNS resolution).
|
|
If that fails (e.g., DNS port is blocked), falls back to an HTTP request check.
|
|
|
|
Args:
|
|
host: Host to connect to (default: 8.8.8.8, Google's public DNS).
|
|
port: Port to connect to (default: 53, DNS port).
|
|
timeout: Connection timeout in seconds (default: 3.0).
|
|
|
|
Returns:
|
|
bool: True if connection successful, False otherwise.
|
|
"""
|
|
# First try: DNS port check (fastest, works if DNS port is open)
|
|
try:
|
|
reader, writer = await asyncio.wait_for(
|
|
asyncio.open_connection(host, port),
|
|
timeout=timeout
|
|
)
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
return True
|
|
except (asyncio.TimeoutError, OSError, ConnectionError):
|
|
# DNS check failed, try HTTP fallback
|
|
pass
|
|
except Exception:
|
|
# Unexpected error, try HTTP fallback
|
|
pass
|
|
|
|
# Fallback: HTTP request check (works even if DNS port is blocked)
|
|
# Run urllib in executor to avoid blocking
|
|
loop = asyncio.get_event_loop()
|
|
try:
|
|
# Use a reliable HTTP endpoint that's likely to be accessible
|
|
# Using IP address to avoid DNS resolution issues
|
|
http_url = "http://1.1.1.1" # Cloudflare DNS
|
|
await asyncio.wait_for(
|
|
loop.run_in_executor(
|
|
None,
|
|
lambda: urllib.request.urlopen(http_url, timeout=timeout).close()
|
|
),
|
|
timeout=timeout
|
|
)
|
|
return True
|
|
except (asyncio.TimeoutError, urllib.error.URLError, OSError, socket.timeout):
|
|
# If IP-based check fails, try a hostname-based check
|
|
try:
|
|
http_url = "http://www.google.com"
|
|
await asyncio.wait_for(
|
|
loop.run_in_executor(
|
|
None,
|
|
lambda: urllib.request.urlopen(http_url, timeout=timeout).close()
|
|
),
|
|
timeout=timeout
|
|
)
|
|
return True
|
|
except (asyncio.TimeoutError, urllib.error.URLError, OSError, socket.timeout):
|
|
return False
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
|
|
|
|
def calculate_path_distances(
|
|
bot: Any, path_str: str, message: Optional[Any] = None
|
|
) -> tuple[str, str]:
|
|
"""Calculate path distance metrics from a path string and optional message.
|
|
|
|
When ``message`` is provided, node IDs are taken from ``routing_info.path_nodes``
|
|
or multi-byte comma parsing of ``message.path`` (same as the test command),
|
|
with a fallback to :func:`parse_path_string` for continuous hex without commas.
|
|
|
|
Args:
|
|
bot: Bot instance (must have db_manager).
|
|
path_str: Path string when no message or for legacy callers.
|
|
message: Optional mesh message for routing_info / path fields.
|
|
|
|
Returns:
|
|
Tuple[str, str]: A tuple containing:
|
|
- path_distance_str: Total distance with segment info (e.g., "123.4km (3 segs, 1 no-loc)").
|
|
- firstlast_distance_str: Distance between first and last repeater (e.g., "45.6km").
|
|
"""
|
|
prefix_hex = getattr(bot, 'prefix_hex_chars', 2)
|
|
|
|
if message is None:
|
|
if not path_str or not str(path_str).strip():
|
|
return "directly (0 hops)", "N/A (direct)"
|
|
path_lower = path_str.lower()
|
|
if "direct" in path_lower or "0 hops" in path_lower:
|
|
return "directly (0 hops)", "N/A (direct)"
|
|
|
|
if not hasattr(bot, 'db_manager'):
|
|
return "unknown distance", "unknown"
|
|
|
|
try:
|
|
node_ids: list[str]
|
|
if message is not None:
|
|
node_ids = extract_path_node_ids_from_message(message)
|
|
if not node_ids and (getattr(message, 'path', None) or ''):
|
|
node_ids = node_ids_from_path_string(message.path, prefix_hex)
|
|
else:
|
|
node_ids = node_ids_from_path_string(path_str, prefix_hex)
|
|
|
|
if len(node_ids) == 0:
|
|
# No nodes parsed - likely direct connection
|
|
return "directly (0 hops)", "N/A (direct)"
|
|
elif len(node_ids) == 1:
|
|
# Single node - local/one hop (no first/last distance since only one node)
|
|
return "locally (1 hop)", "N/A (1 hop)"
|
|
elif len(node_ids) < 2:
|
|
# Edge case - less than 2 nodes
|
|
return "locally (1 hop)", "N/A (1 hop)"
|
|
|
|
# Look up locations for each node ID
|
|
# _get_node_location_from_db returns ((lat, lon), public_key) or None
|
|
node_locations: list[Optional[tuple[float, float]]] = []
|
|
for node_id in node_ids:
|
|
result = _get_node_location_from_db(bot, node_id)
|
|
if result:
|
|
location, _ = result # Extract location tuple, ignore public_key
|
|
node_locations.append(location)
|
|
else:
|
|
node_locations.append(None)
|
|
|
|
# Calculate total path distance (sum of all segments)
|
|
total_distance = 0.0
|
|
segments_with_location = 0
|
|
segments_without_location = 0
|
|
|
|
for i in range(len(node_locations) - 1):
|
|
loc1 = node_locations[i]
|
|
loc2 = node_locations[i + 1]
|
|
|
|
if loc1 and loc2:
|
|
# Both nodes have locations
|
|
segment_distance = calculate_distance(
|
|
loc1[0], loc1[1],
|
|
loc2[0], loc2[1]
|
|
)
|
|
total_distance += segment_distance
|
|
segments_with_location += 1
|
|
else:
|
|
# At least one node missing location
|
|
segments_without_location += 1
|
|
|
|
# Format path_distance string
|
|
if total_distance > 0:
|
|
path_distance_str = f"{total_distance:.1f}km"
|
|
if segments_with_location > 0 or segments_without_location > 0:
|
|
seg_info = []
|
|
if segments_with_location > 0:
|
|
seg_info.append(f"{segments_with_location} segs")
|
|
if segments_without_location > 0:
|
|
seg_info.append(f"{segments_without_location} no-loc")
|
|
if seg_info:
|
|
path_distance_str += f" ({', '.join(seg_info)})"
|
|
else:
|
|
# No distance calculated (all segments missing locations)
|
|
if segments_without_location > 0:
|
|
# We have segments but no location data
|
|
hop_count = len(node_ids)
|
|
path_distance_str = f"unknown distance ({hop_count} hops, no locations)"
|
|
else:
|
|
# Fallback - shouldn't happen but provide meaningful text
|
|
hop_count = len(node_ids)
|
|
path_distance_str = f"unknown distance ({hop_count} hops)"
|
|
|
|
# Calculate first-to-last distance
|
|
firstlast_distance_str = ""
|
|
first_location = node_locations[0]
|
|
last_location = node_locations[-1]
|
|
|
|
if first_location and last_location:
|
|
firstlast_distance = calculate_distance(
|
|
first_location[0], first_location[1],
|
|
last_location[0], last_location[1]
|
|
)
|
|
firstlast_distance_str = f"{firstlast_distance:.1f}km"
|
|
elif len(node_ids) >= 2:
|
|
# We have 2+ nodes but missing location data
|
|
firstlast_distance_str = "unknown (no locations)"
|
|
|
|
return path_distance_str, firstlast_distance_str
|
|
|
|
except Exception as e:
|
|
# Log error but don't fail - return empty strings
|
|
if hasattr(bot, 'logger'):
|
|
bot.logger.debug(f"Error calculating path distances: {e}")
|
|
return "", ""
|
|
|
|
|
|
def _get_node_location_from_db(bot: Any, node_id: str, reference_location: Optional[tuple[float, float]] = None, recency_days: Optional[int] = None) -> Optional[tuple[tuple[float, float], Optional[str]]]:
|
|
"""Get location for a node ID from the database.
|
|
|
|
For LoRa networks, prefers shorter distances when there are prefix collisions,
|
|
as LoRa range is limited by the curve of the earth.
|
|
|
|
Args:
|
|
bot: Bot instance (must have db_manager).
|
|
node_id: 2-character hex node ID (e.g., "01", "5f").
|
|
reference_location: Optional (lat, lon) to calculate distance from for LoRa preference.
|
|
recency_days: Optional number of days to filter by recency (only use repeaters heard within this window).
|
|
|
|
Returns:
|
|
Optional[Tuple[Tuple[float, float], Optional[str]]]:
|
|
- ((latitude, longitude), public_key) if found, where public_key may be None
|
|
- None if not found
|
|
"""
|
|
if not hasattr(bot, 'db_manager'):
|
|
return None
|
|
|
|
try:
|
|
# Look up node by public key prefix (first 2 characters)
|
|
prefix_pattern = f"{node_id}%"
|
|
|
|
# Get all candidates with locations, optionally filtered by recency
|
|
# Include public_key so we can return it when distance-based selection is used
|
|
if recency_days is not None:
|
|
query = f'''
|
|
SELECT latitude, longitude, is_starred, public_key,
|
|
COALESCE(last_advert_timestamp, last_heard) as last_seen
|
|
FROM complete_contact_tracking
|
|
WHERE public_key LIKE ?
|
|
AND latitude IS NOT NULL AND longitude IS NOT NULL
|
|
AND latitude != 0 AND longitude != 0
|
|
AND role IN ('repeater', 'roomserver')
|
|
AND COALESCE(last_advert_timestamp, last_heard) >= datetime('now', '-{recency_days} days')
|
|
'''
|
|
results = bot.db_manager.execute_query(query, (prefix_pattern,))
|
|
else:
|
|
query = '''
|
|
SELECT latitude, longitude, is_starred, public_key,
|
|
COALESCE(last_advert_timestamp, last_heard) as last_seen
|
|
FROM complete_contact_tracking
|
|
WHERE public_key LIKE ?
|
|
AND latitude IS NOT NULL AND longitude IS NOT NULL
|
|
AND latitude != 0 AND longitude != 0
|
|
AND role IN ('repeater', 'roomserver')
|
|
'''
|
|
results = bot.db_manager.execute_query(query, (prefix_pattern,))
|
|
|
|
if not results:
|
|
return None
|
|
|
|
# If we have a reference location, prefer shorter distances (LoRa range limitation)
|
|
if reference_location and len(results) > 1:
|
|
ref_lat, ref_lon = reference_location
|
|
|
|
# Calculate distances and sort by distance (shorter first)
|
|
candidates_with_distance = []
|
|
for row in results:
|
|
lat = row.get('latitude')
|
|
lon = row.get('longitude')
|
|
if lat is not None and lon is not None:
|
|
distance = calculate_distance(ref_lat, ref_lon, float(lat), float(lon))
|
|
is_starred = row.get('is_starred', False)
|
|
last_seen = row.get('last_seen', '')
|
|
candidates_with_distance.append((distance, is_starred, last_seen, row))
|
|
|
|
if candidates_with_distance:
|
|
# Sort by: starred first, then distance (shorter = better for LoRa), then recency (newer first)
|
|
# For recency, we need newer timestamps to sort first. Use a two-pass stable sort:
|
|
# First sort by starred and distance, then stable sort by recency in reverse
|
|
from datetime import datetime
|
|
|
|
def get_timestamp_key(ts_str: Optional[str]) -> float:
|
|
"""Convert timestamp string to sortable key (newer = smaller key for reverse sort)"""
|
|
if not ts_str:
|
|
return float('inf') # Empty timestamps sort last
|
|
try:
|
|
# Parse timestamp and return negative timestamp for descending sort
|
|
dt = datetime.fromisoformat(ts_str.replace(' ', 'T'))
|
|
return -dt.timestamp() # Negate: newer timestamps have larger timestamps, so -timestamp is smaller
|
|
except:
|
|
# Fallback: use string comparison (newer strings are lexicographically greater)
|
|
# To reverse, we'll use a large value minus a hash
|
|
return -len(ts_str) * 1000000 - hash(ts_str)
|
|
|
|
# Sort by: starred first, then distance (shorter = better for LoRa), then recency (newer first)
|
|
# IMPORTANT: Distance takes priority over recency when we have a reference location
|
|
# Use a single sort with all three criteria to ensure proper ordering
|
|
candidates_with_distance.sort(key=lambda x: (
|
|
not x[1], # Starred first (False < True, so starred=True comes before starred=False)
|
|
x[0], # Distance (shorter first) - THIS IS THE PRIMARY FACTOR for LoRa
|
|
get_timestamp_key(x[2]) # Recency (newer first) - only as tiebreaker
|
|
))
|
|
|
|
# Get the best candidate
|
|
best_row = candidates_with_distance[0][3]
|
|
lat = best_row.get('latitude')
|
|
lon = best_row.get('longitude')
|
|
if lat is not None and lon is not None:
|
|
# Return location and also the public key of the selected node (for distance-based selection)
|
|
# This allows us to store which specific node was selected when there's a prefix collision
|
|
# Always return a tuple: (location, public_key or None)
|
|
public_key = best_row.get('public_key')
|
|
return ((float(lat), float(lon)), public_key)
|
|
|
|
# No reference location or single result - use standard ordering
|
|
# Prefer starred, then most recent
|
|
# For recency, parse timestamps properly to ensure newer comes first
|
|
from datetime import datetime
|
|
|
|
def get_timestamp_key_no_ref(ts_str: Optional[str]) -> float:
|
|
"""Convert timestamp string to sortable key (newer = smaller key)"""
|
|
if not ts_str:
|
|
return float('inf') # Empty timestamps sort last
|
|
try:
|
|
dt = datetime.fromisoformat(ts_str.replace(' ', 'T'))
|
|
return -dt.timestamp() # Negate: newer timestamps have larger timestamps, so -timestamp is smaller
|
|
except:
|
|
return -len(ts_str) * 1000000 - hash(ts_str)
|
|
|
|
results.sort(key=lambda x: (
|
|
not x.get('is_starred', False), # Starred first (False < True)
|
|
get_timestamp_key_no_ref(x.get('last_seen', '')) # More recent first (newer = smaller key)
|
|
))
|
|
|
|
row = results[0]
|
|
lat = row.get('latitude')
|
|
lon = row.get('longitude')
|
|
if lat is not None and lon is not None:
|
|
# Return location and also the public key if available (for distance-based selection)
|
|
# Always return a tuple: (location, public_key or None)
|
|
public_key = row.get('public_key')
|
|
return ((float(lat), float(lon)), public_key)
|
|
|
|
return None
|
|
except Exception as e:
|
|
if hasattr(bot, 'logger'):
|
|
bot.logger.debug(f"Error getting node location for {node_id}: {e}")
|
|
return None
|
|
|
|
def _get_node_location_and_key_from_db(bot: Any, node_id: str, reference_location: Optional[tuple[float, float]] = None) -> Optional[tuple[tuple[float, float], str]]:
|
|
"""Get location and public key for a node ID from the database.
|
|
|
|
For LoRa networks, prefers shorter distances when there are prefix collisions,
|
|
as LoRa range is limited by the curve of the earth.
|
|
|
|
Args:
|
|
bot: Bot instance (must have db_manager).
|
|
node_id: 2-character hex node ID (e.g., "01", "5f").
|
|
reference_location: Optional (lat, lon) to calculate distance from for LoRa preference.
|
|
|
|
Returns:
|
|
Optional[Tuple[Tuple[float, float], str]]: Tuple of ((latitude, longitude), public_key) or None if not found.
|
|
"""
|
|
if not hasattr(bot, 'db_manager'):
|
|
return None
|
|
|
|
try:
|
|
# Look up node by public key prefix (first 2 characters)
|
|
prefix_pattern = f"{node_id}%"
|
|
|
|
# Get all candidates with locations
|
|
query = '''
|
|
SELECT latitude, longitude, is_starred, public_key,
|
|
COALESCE(last_advert_timestamp, last_heard) as last_seen
|
|
FROM complete_contact_tracking
|
|
WHERE public_key LIKE ?
|
|
AND latitude IS NOT NULL AND longitude IS NOT NULL
|
|
AND latitude != 0 AND longitude != 0
|
|
AND role IN ('repeater', 'roomserver')
|
|
'''
|
|
|
|
results = bot.db_manager.execute_query(query, (prefix_pattern,))
|
|
|
|
if not results:
|
|
return None
|
|
|
|
# If we have a reference location, prefer shorter distances (LoRa range limitation)
|
|
if reference_location and len(results) > 1:
|
|
ref_lat, ref_lon = reference_location
|
|
|
|
# Calculate distances and sort by distance (shorter first)
|
|
# For LoRa networks, shorter distances are more likely to be correct single-hop connections
|
|
candidates_with_distance = []
|
|
for row in results:
|
|
lat = row.get('latitude')
|
|
lon = row.get('longitude')
|
|
if lat is not None and lon is not None:
|
|
distance = calculate_distance(ref_lat, ref_lon, float(lat), float(lon))
|
|
is_starred = row.get('is_starred', False)
|
|
last_seen = row.get('last_seen', '')
|
|
public_key = row.get('public_key', '')
|
|
candidates_with_distance.append((distance, is_starred, last_seen, public_key, row))
|
|
|
|
if candidates_with_distance:
|
|
# Sort by: starred first (False < True), then distance (shorter = better for LoRa), then recency
|
|
candidates_with_distance.sort(key=lambda x: (
|
|
not x[1], # Starred first (False < True, so starred=True comes before starred=False)
|
|
x[0], # Distance (shorter first - important for LoRa range limitations)
|
|
x[2] if x[2] else '' # More recent first (newer timestamps sort later in string comparison)
|
|
))
|
|
|
|
# Get the best candidate
|
|
best_row = candidates_with_distance[0][4]
|
|
lat = best_row.get('latitude')
|
|
lon = best_row.get('longitude')
|
|
public_key = candidates_with_distance[0][3]
|
|
if lat is not None and lon is not None and public_key:
|
|
return ((float(lat), float(lon)), public_key)
|
|
|
|
# No reference location or single result - use standard ordering
|
|
# Prefer starred, then most recent
|
|
results.sort(key=lambda x: (
|
|
not x.get('is_starred', False), # Starred first (False < True)
|
|
x.get('last_seen', '') if x.get('last_seen') else '' # More recent first
|
|
))
|
|
|
|
row = results[0]
|
|
lat = row.get('latitude')
|
|
lon = row.get('longitude')
|
|
public_key = row.get('public_key', '')
|
|
if lat is not None and lon is not None and public_key:
|
|
return ((float(lat), float(lon)), public_key)
|
|
|
|
return None
|
|
except Exception as e:
|
|
if hasattr(bot, 'logger'):
|
|
bot.logger.debug(f"Error getting node location and key for {node_id}: {e}")
|
|
return None
|
|
|
|
|
|
# Maximum plausible elapsed ms (5 minutes) for device clock validation.
|
|
# Values above indicate device time is far in the past (e.g. epoch); negative = in the future.
|
|
|
|
def format_keyword_response_with_placeholders(
|
|
response_format: str,
|
|
message: Any,
|
|
bot: Any,
|
|
mesh_info: Optional[dict[str, Any]] = None
|
|
) -> str:
|
|
"""Format a keyword response string with all available placeholders.
|
|
|
|
Supports both message-based placeholders and mesh-info-based placeholders.
|
|
This is a shared function used by both Keywords and Scheduled_Messages.
|
|
|
|
Args:
|
|
response_format: Response format string with placeholders.
|
|
message: MeshMessage instance (can be None for scheduled messages).
|
|
bot: Bot instance (must have config, db_manager).
|
|
mesh_info: Optional mesh network info dict (for scheduled message placeholders).
|
|
|
|
Returns:
|
|
str: Formatted response string.
|
|
"""
|
|
try:
|
|
replacements = {}
|
|
|
|
# Message-based placeholders (require message object)
|
|
if message:
|
|
# Basic message fields
|
|
replacements['sender'] = message.sender_id or "Unknown"
|
|
replacements['path'] = message.path or "Unknown"
|
|
replacements['snr'] = message.snr or "Unknown"
|
|
replacements['rssi'] = message.rssi or "Unknown"
|
|
# Compute elapsed from message.timestamp (same as TestCommand) so it's available
|
|
# for all keywords. Using message.elapsed would miss when it's unset on some paths.
|
|
_translator = getattr(bot, 'translator', None)
|
|
replacements['elapsed'] = format_elapsed_display(
|
|
getattr(message, 'timestamp', None), _translator
|
|
)
|
|
|
|
# Build connection_info
|
|
routing_info = message.path or "Unknown routing"
|
|
if "via ROUTE_TYPE_" in routing_info:
|
|
parts = routing_info.split(" via ROUTE_TYPE_")
|
|
if len(parts) > 0:
|
|
routing_info = parts[0]
|
|
|
|
snr_info = f"SNR: {message.snr or 'Unknown'} dB"
|
|
rssi_info = f"RSSI: {message.rssi or 'Unknown'} dBm"
|
|
connection_info = f"{routing_info} | {snr_info} | {rssi_info}"
|
|
replacements['connection_info'] = connection_info
|
|
|
|
# Calculate path distances
|
|
path_distance, firstlast_distance = calculate_path_distances(
|
|
bot, message.path or "", message=message
|
|
)
|
|
replacements['path_distance'] = path_distance
|
|
replacements['firstlast_distance'] = firstlast_distance
|
|
|
|
# Format timestamp
|
|
try:
|
|
tz, _ = get_config_timezone(bot.config, getattr(bot, 'logger', None))
|
|
dt = datetime.now(tz)
|
|
time_str = dt.strftime("%H:%M:%S")
|
|
except Exception:
|
|
time_str = "Unknown"
|
|
|
|
replacements['timestamp'] = time_str
|
|
|
|
# Total hops: use message.hops when set, else parse from path string (e.g. "01,5f (2 hops)")
|
|
hops_val = getattr(message, 'hops', None)
|
|
if hops_val is not None and isinstance(hops_val, int):
|
|
replacements['hops'] = str(hops_val)
|
|
else:
|
|
path_str = message.path or ""
|
|
hop_match = re.search(r'\((\d+)\s*hops?', path_str, re.IGNORECASE)
|
|
if hop_match:
|
|
replacements['hops'] = hop_match.group(1)
|
|
elif re.search(r'\bdirect\b|\b0\s*hops?\b', path_str, re.IGNORECASE):
|
|
replacements['hops'] = "0"
|
|
else:
|
|
replacements['hops'] = "?"
|
|
# Pluralized label: "1 hop", "2 hops", or "?" when unknown
|
|
h = replacements['hops']
|
|
if h == "?":
|
|
replacements['hops_label'] = "?"
|
|
else:
|
|
n = int(h)
|
|
replacements['hops_label'] = "1 hop" if n == 1 else f"{n} hops"
|
|
else:
|
|
# No message - use defaults for message-based placeholders
|
|
replacements['sender'] = "Unknown"
|
|
replacements['path'] = "Unknown"
|
|
replacements['snr'] = "Unknown"
|
|
replacements['rssi'] = "Unknown"
|
|
replacements['elapsed'] = "Unknown"
|
|
replacements['connection_info'] = "Unknown"
|
|
replacements['path_distance'] = ""
|
|
replacements['firstlast_distance'] = ""
|
|
replacements['timestamp'] = "Unknown"
|
|
replacements['hops'] = "?"
|
|
replacements['hops_label'] = "?"
|
|
|
|
# Mesh-info-based placeholders (from scheduled messages)
|
|
if mesh_info:
|
|
replacements.update({
|
|
'total_contacts': mesh_info.get('total_contacts', 0),
|
|
'total_repeaters': mesh_info.get('total_repeaters', 0),
|
|
'total_companions': mesh_info.get('total_companions', 0),
|
|
'total_roomservers': mesh_info.get('total_roomservers', 0),
|
|
'total_sensors': mesh_info.get('total_sensors', 0),
|
|
'recent_activity_24h': mesh_info.get('recent_activity_24h', 0),
|
|
'new_companions_7d': mesh_info.get('new_companions_7d', 0),
|
|
'new_repeaters_7d': mesh_info.get('new_repeaters_7d', 0),
|
|
'new_roomservers_7d': mesh_info.get('new_roomservers_7d', 0),
|
|
'new_sensors_7d': mesh_info.get('new_sensors_7d', 0),
|
|
'total_contacts_30d': mesh_info.get('total_contacts_30d', 0),
|
|
'total_repeaters_30d': mesh_info.get('total_repeaters_30d', 0),
|
|
'total_companions_30d': mesh_info.get('total_companions_30d', 0),
|
|
'total_roomservers_30d': mesh_info.get('total_roomservers_30d', 0),
|
|
'total_sensors_30d': mesh_info.get('total_sensors_30d', 0),
|
|
# Legacy placeholders
|
|
'repeaters': mesh_info.get('total_repeaters', 0),
|
|
'companions': mesh_info.get('total_companions', 0),
|
|
})
|
|
else:
|
|
# No mesh_info - use defaults
|
|
mesh_defaults = {
|
|
'total_contacts': 0,
|
|
'total_repeaters': 0,
|
|
'total_companions': 0,
|
|
'total_roomservers': 0,
|
|
'total_sensors': 0,
|
|
'recent_activity_24h': 0,
|
|
'new_companions_7d': 0,
|
|
'new_repeaters_7d': 0,
|
|
'new_roomservers_7d': 0,
|
|
'new_sensors_7d': 0,
|
|
'total_contacts_30d': 0,
|
|
'total_repeaters_30d': 0,
|
|
'total_companions_30d': 0,
|
|
'total_roomservers_30d': 0,
|
|
'total_sensors_30d': 0,
|
|
'repeaters': 0,
|
|
'companions': 0,
|
|
}
|
|
replacements.update(mesh_defaults)
|
|
|
|
# Format the response with all replacements
|
|
return response_format.format(**replacements)
|
|
|
|
except (KeyError, ValueError) as e:
|
|
# If formatting fails, return as-is (might not have all placeholders)
|
|
if hasattr(bot, 'logger'):
|
|
bot.logger.debug(f"Error formatting response with placeholders: {e}")
|
|
return response_format
|