Files
meshcore-bot/modules/commands/airplanes_command.py
T
agessaman af6f693cff refactor: update max_results configuration for airplanes command
Changed the default value of max_results in config.ini.example from 0 (no limit) to 3, ensuring a more manageable output size for aircraft listings. Updated the AirplanesCommand class to reflect this new default and added logic to handle message size constraints when formatting the aircraft list. This refactor improves the user experience by preventing excessive data in responses while maintaining functionality.
2026-04-14 11:35:00 -07:00

822 lines
32 KiB
Python

#!/usr/bin/env python3
"""
Airplanes command for the MeshCore Bot
Provides aircraft tracking using ADS-B data from airplanes.live or compatible APIs
"""
import asyncio
import math
import re
from typing import Any, Optional
import requests
from ..models import MeshMessage
from ..utils import calculate_distance
from .base_command import BaseCommand
class AirplanesCommand(BaseCommand):
"""Handles aircraft tracking commands using ADS-B data.
Provides aircraft information overhead at companion location, bot location,
or specified coordinates. Supports filtering and detailed single-aircraft display.
"""
# Plugin metadata
name = "airplanes"
keywords = ['airplanes', 'aircraft', 'planes', 'adsb', 'overhead']
description = "Get aircraft overhead (usage: airplanes [location] [options] or overhead [lat,lon])"
category = "general"
cooldown_seconds = 2 # Respect API rate limit of 1 req/sec with buffer
requires_internet = True
# Documentation
short_description = "Get aircraft overhead using ADS-B data"
usage = "airplanes [lat,lon|here] [radius=N] [options]"
examples = ["airplanes", "overhead 47.6,-122.3"]
parameters = [
{"name": "location", "description": "Coordinates or here for your companion's location if advertised"},
{"name": "radius", "description": "Search radius in nautical miles (default: 25)"},
{"name": "filters", "description": "alt=, type=, military, closest, etc."}
]
def __init__(self, bot):
super().__init__(bot)
self.airplanes_enabled = self.get_config_value('Airplanes_Command', 'enabled', fallback=True, value_type='bool')
self.api_url = self.get_config_value('Airplanes_Command', 'api_url', fallback='http://api.airplanes.live/v2/', value_type='str')
self.default_radius = self.get_config_value('Airplanes_Command', 'default_radius', fallback=25, value_type='float')
# Default chosen to fit single-message constraints on the smallest channel budget.
# Channel payload can be as low as 130 bytes; three compact lines are reliable.
self.max_results = self.get_config_value('Airplanes_Command', 'max_results', fallback=3, value_type='int')
self.url_timeout = self.get_config_value('Airplanes_Command', 'url_timeout', fallback=10, value_type='int')
# Ensure API URL ends with /
if self.api_url and not self.api_url.endswith('/'):
self.api_url += '/'
def can_execute(self, message: MeshMessage, skip_channel_check: bool = False) -> bool:
"""Check if this command can be executed with the given message.
Args:
message: The message triggering the command.
Returns:
bool: True if command is enabled and checks pass, False otherwise.
"""
if not self.airplanes_enabled:
return False
return super().can_execute(message)
def get_help_text(self) -> str:
"""Get help text for this command.
Returns:
str: The help text for this command.
"""
return self.translate('commands.airplanes.description')
def _calculate_bearing(self, lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Calculate bearing from point 1 to point 2 in degrees.
Args:
lat1: Latitude of point 1 in degrees.
lon1: Longitude of point 1 in degrees.
lat2: Latitude of point 2 in degrees.
lon2: Longitude of point 2 in degrees.
Returns:
float: Bearing in degrees (0-360, where 0 is North).
"""
lat1_rad = math.radians(lat1)
lat2_rad = math.radians(lat2)
dlon_rad = math.radians(lon2 - lon1)
y = math.sin(dlon_rad) * math.cos(lat2_rad)
x = math.cos(lat1_rad) * math.sin(lat2_rad) - \
math.sin(lat1_rad) * math.cos(lat2_rad) * math.cos(dlon_rad)
bearing_rad = math.atan2(y, x)
bearing_deg = (math.degrees(bearing_rad) + 360) % 360
return bearing_deg
def _bearing_to_cardinal(self, bearing: float) -> str:
"""Convert bearing in degrees to cardinal direction.
Args:
bearing: Bearing in degrees (0-360).
Returns:
str: Cardinal direction (N, NE, E, SE, S, SW, W, NW).
"""
directions = ['N', 'NE', 'E', 'SE', 'S', 'SW', 'W', 'NW']
index = round(bearing / 45) % 8
return directions[index]
def _get_companion_location(self, message: MeshMessage) -> Optional[tuple[float, float]]:
"""Get companion/sender location from database.
Args:
message: The message object.
Returns:
Optional[Tuple[float, float]]: Tuple of (latitude, longitude) or None.
"""
try:
sender_pubkey = message.sender_pubkey
if not sender_pubkey:
return None
query = '''
SELECT latitude, longitude
FROM complete_contact_tracking
WHERE public_key = ?
AND latitude IS NOT NULL AND longitude IS NOT NULL
AND latitude != 0 AND longitude != 0
ORDER BY COALESCE(last_advert_timestamp, last_heard) DESC
LIMIT 1
'''
results = self.bot.db_manager.execute_query(query, (sender_pubkey,))
if results:
row = results[0]
return (row['latitude'], row['longitude'])
return None
except Exception as e:
self.logger.debug(f"Error getting companion location: {e}")
return None
def _get_bot_location(self) -> Optional[tuple[float, float]]:
"""Get bot location from config.
Returns:
Optional[Tuple[float, float]]: Tuple of (latitude, longitude) or None.
"""
try:
lat = self.bot.config.getfloat('Bot', 'bot_latitude', fallback=None)
lon = self.bot.config.getfloat('Bot', 'bot_longitude', fallback=None)
if lat is not None and lon is not None:
# Validate coordinates
if -90 <= lat <= 90 and -180 <= lon <= 180:
return (lat, lon)
return None
except Exception as e:
self.logger.debug(f"Error getting bot location: {e}")
return None
def _parse_coordinates(self, args: str) -> Optional[tuple[float, float]]:
"""Parse latitude and longitude from command arguments.
Args:
args: Command arguments string.
Returns:
Optional[Tuple[float, float]]: Tuple of (latitude, longitude) or None.
"""
# Handle formats: "47.6,-122.3", "47.6 -122.3", "47.6, -122.3"
pattern = r'^\s*(-?\d+\.?\d*)\s*[, ]\s*(-?\d+\.?\d*)\s*$'
match = re.match(pattern, args)
if match:
try:
lat = float(match.group(1))
lon = float(match.group(2))
# Validate ranges
if -90 <= lat <= 90 and -180 <= lon <= 180:
return (lat, lon)
except ValueError:
pass
return None
def _parse_filters(self, args: list[str]) -> dict[str, Any]:
"""Parse filter options from command arguments.
Args:
args: List of command argument strings.
Returns:
Dict[str, Any]: Dictionary of filter options.
"""
filters = {
'radius': self.default_radius,
'alt_min': None,
'alt_max': None,
'speed_min': None,
'speed_max': None,
'aircraft_type': None,
'callsign': None,
'military': False,
'ladd': False,
'pia': False,
'squawk': None,
'limit': self.max_results, # 0 = no configured cap (still single-message bounded at send time)
'sort': 'distance' # distance, altitude, speed
}
for arg in args:
arg_lower = arg.lower()
# Radius
if arg_lower.startswith('radius='):
try:
filters['radius'] = float(arg_lower.split('=')[1])
filters['radius'] = min(250, max(1, filters['radius'])) # Clamp 1-250nm
except (ValueError, IndexError):
pass
# Altitude range
elif arg_lower.startswith('alt='):
try:
alt_str = arg_lower.split('=')[1]
if '-' in alt_str:
parts = alt_str.split('-')
filters['alt_min'] = float(parts[0])
filters['alt_max'] = float(parts[1])
else:
filters['alt_min'] = float(alt_str)
except (ValueError, IndexError):
pass
# Speed range
elif arg_lower.startswith('speed='):
try:
speed_str = arg_lower.split('=')[1]
if '-' in speed_str:
parts = speed_str.split('-')
filters['speed_min'] = float(parts[0])
filters['speed_max'] = float(parts[1])
else:
filters['speed_min'] = float(speed_str)
except (ValueError, IndexError):
pass
# Aircraft type
elif arg_lower.startswith('type='):
filters['aircraft_type'] = arg_lower.split('=')[1].upper()
# Callsign
elif arg_lower.startswith('callsign='):
filters['callsign'] = arg_lower.split('=')[1].upper()
# Squawk
elif arg_lower.startswith('squawk='):
filters['squawk'] = arg_lower.split('=')[1]
# Limit
elif arg_lower.startswith('limit='):
try:
filters['limit'] = int(arg_lower.split('=')[1])
filters['limit'] = min(50, max(1, filters['limit'])) # Clamp 1-50
except (ValueError, IndexError):
pass
# Flags
elif arg_lower == 'military':
filters['military'] = True
elif arg_lower == 'ladd':
filters['ladd'] = True
elif arg_lower == 'pia':
filters['pia'] = True
# Sort options
elif arg_lower in ['closest', 'distance']:
filters['sort'] = 'distance'
elif arg_lower in ['highest', 'altitude']:
filters['sort'] = 'altitude'
elif arg_lower == 'fastest':
filters['sort'] = 'speed'
return filters
def _fetch_aircraft_data(self, lat: float, lon: float, radius: float) -> Optional[dict[str, Any]]:
"""Fetch aircraft data from API.
Args:
lat: Latitude in degrees.
lon: Longitude in degrees.
radius: Search radius in nautical miles.
Returns:
Optional[Dict[str, Any]]: API response JSON or None on error.
"""
try:
# Convert radius from nautical miles to approximate degrees (rough conversion)
# 1 nm ≈ 0.0167 degrees at equator, but we'll use a simple approximation
# More accurate: use the API's native radius parameter if it accepts nm
url = f"{self.api_url}point/{lat}/{lon}/{radius}"
self.logger.debug(f"Fetching aircraft data from {url}")
response = requests.get(url, timeout=self.url_timeout)
response.raise_for_status()
data = response.json()
return data
except requests.exceptions.Timeout:
self.logger.warning("API request timed out")
return None
except requests.exceptions.RequestException as e:
self.logger.warning(f"API request failed: {e}")
return None
except ValueError as e:
self.logger.warning(f"Invalid JSON response: {e}")
return None
def _filter_aircraft(self, aircraft_list: list[dict[str, Any]], filters: dict[str, Any], query_lat: float, query_lon: float) -> list[dict[str, Any]]:
"""Filter and sort aircraft based on criteria.
Args:
aircraft_list: List of aircraft dictionaries.
filters: Filter criteria dictionary.
query_lat: Query latitude for distance calculation.
query_lon: Query longitude for distance calculation.
Returns:
List[Dict[str, Any]]: Filtered and sorted aircraft list.
"""
filtered = []
for aircraft in aircraft_list:
# Skip if no position
if 'lat' not in aircraft or 'lon' not in aircraft:
continue
# Calculate distance
distance_km = calculate_distance(query_lat, query_lon, aircraft['lat'], aircraft['lon'])
distance_nm = distance_km / 1.852 # Convert km to nautical miles
aircraft['_distance_nm'] = distance_nm
aircraft['_distance_km'] = distance_km
# Calculate bearing
bearing = self._calculate_bearing(query_lat, query_lon, aircraft['lat'], aircraft['lon'])
aircraft['_bearing'] = bearing
aircraft['_bearing_cardinal'] = self._bearing_to_cardinal(bearing)
# Apply filters
# Altitude filtering
if filters['alt_min'] is not None or filters['alt_max'] is not None:
alt = aircraft.get('alt_baro') or aircraft.get('alt_geom')
if alt is None or (isinstance(alt, str) and alt.lower() == 'ground'):
continue
if isinstance(alt, str):
continue
if filters['alt_min'] is not None and alt < filters['alt_min']:
continue
if filters['alt_max'] is not None and alt > filters['alt_max']:
continue
if filters['speed_min'] is not None:
gs = aircraft.get('gs')
if gs is None or gs < filters['speed_min']:
continue
if filters['speed_max'] is not None:
gs = aircraft.get('gs')
if gs is None or gs > filters['speed_max']:
continue
if filters['aircraft_type']:
ac_type = aircraft.get('t', '').upper()
if filters['aircraft_type'] not in ac_type:
continue
if filters['callsign']:
callsign = aircraft.get('flight', '').upper()
if filters['callsign'] not in callsign:
continue
if filters['squawk']:
squawk = aircraft.get('squawk', '')
if str(filters['squawk']) != str(squawk):
continue
if filters['military']:
db_flags = aircraft.get('dbFlags', 0)
if not (db_flags & 1): # Military flag
continue
if filters['ladd']:
db_flags = aircraft.get('dbFlags', 0)
if not (db_flags & 8): # LADD flag
continue
if filters['pia']:
db_flags = aircraft.get('dbFlags', 0)
if not (db_flags & 4): # PIA flag
continue
# Check radius (already calculated distance)
if distance_nm > filters['radius']:
continue
filtered.append(aircraft)
# Sort
if filters['sort'] == 'distance':
filtered.sort(key=lambda x: x.get('_distance_nm', float('inf')))
elif filters['sort'] == 'altitude':
filtered.sort(key=lambda x: (x.get('alt_baro') or x.get('alt_geom') or 0), reverse=True)
elif filters['sort'] == 'speed':
filtered.sort(key=lambda x: (x.get('gs') or 0), reverse=True)
# Apply limit (0 = no limit)
if filters['limit'] > 0:
return filtered[:filters['limit']]
return filtered
def _format_single_aircraft(self, aircraft: dict[str, Any], query_lat: float, query_lon: float, max_length: int = 130) -> str:
"""Format detailed single aircraft response (~130 characters).
More user-friendly format: puts distance/bearing first, uses commas in numbers,
and formats in a more readable way.
Args:
aircraft: Aircraft data dictionary.
query_lat: Query latitude.
query_lon: Query longitude.
max_length: Maximum message length (default 130).
Returns:
str: Formatted aircraft string.
"""
# Get basic info
callsign = aircraft.get('flight', '').strip() or aircraft.get('r', 'N/A')
ac_type = aircraft.get('t', '')
operator = aircraft.get('ownOp', '').strip()
# Distance and bearing (most important for overhead command - put first)
distance_nm = aircraft.get('_distance_nm', 0)
bearing_cardinal = aircraft.get('_bearing_cardinal', 'N')
# Altitude with comma formatting
alt = aircraft.get('alt_baro') or aircraft.get('alt_geom')
if isinstance(alt, str):
alt_str = alt
elif alt is not None:
alt_str = f"{int(alt):,}ft" # Add comma separator for readability
else:
alt_str = "N/A"
# Speed
gs = aircraft.get('gs')
speed_str = f"{int(gs)}kt" if gs is not None else "N/A"
# Track
track = aircraft.get('track')
track_str = f"{int(track)}°" if track is not None else "N/A"
# Vertical rate
baro_rate = aircraft.get('baro_rate')
geom_rate = aircraft.get('geom_rate')
vs = baro_rate or geom_rate
vs_str = ""
if vs is not None:
vs_str = f"{'+' if vs > 0 else ''}{int(vs)}fpm"
# Build user-friendly response: "Callsign (Type) Operator distance bearing: altitude @ speed, heading, vertical_rate"
# Example: "QXE2307 (E75L) Horizon Air 7.5nm NW: 21,675ft @ 366kt, 354°, +1600fpm"
response_parts = []
# Callsign and type
if ac_type:
response_parts.append(f"{callsign} ({ac_type})")
else:
response_parts.append(callsign)
# Add operator/airline if available
if operator:
# Abbreviate long operator names to fit within limits
# Common abbreviations for major airlines
operator_abbrev = {
'ALASKA AIRLINES INC': 'Alaska',
'ALASKA AIRLINES': 'Alaska',
'HORIZON AIR': 'Horizon Air',
'HORIZON AIR INDUSTRIES': 'Horizon Air',
'DELTA AIR LINES INC': 'Delta',
'DELTA AIR LINES': 'Delta',
'AMERICAN AIRLINES INC': 'American',
'AMERICAN AIRLINES': 'American',
'UNITED AIR LINES INC': 'United',
'UNITED AIR LINES': 'United',
'SOUTHWEST AIRLINES CO': 'Southwest',
'SOUTHWEST AIRLINES': 'Southwest',
'JETBLUE AIRWAYS CORP': 'JetBlue',
'JETBLUE AIRWAYS': 'JetBlue',
'SPIRIT AIRLINES INC': 'Spirit',
'SPIRIT AIRLINES': 'Spirit',
'FRONTIER AIRLINES INC': 'Frontier',
'FRONTIER AIRLINES': 'Frontier',
'ALLEGIANT AIR LLC': 'Allegiant',
'ALLEGIANT AIR': 'Allegiant',
}
# Try to find abbreviation, otherwise use original (truncated if too long)
operator_display = operator_abbrev.get(operator.upper(), operator)
# If still too long, truncate intelligently (keep first part)
if len(operator_display) > 20:
# Try to truncate at a word boundary
words = operator_display.split()
operator_display = words[0]
if len(operator_display) > 20:
operator_display = operator_display[:17] + "..."
response_parts.append(operator_display)
# Distance and bearing (most important)
response_parts.append(f"{distance_nm:.1f}nm {bearing_cardinal}")
# Main info with colon separator
main_info = f"{alt_str} @ {speed_str}, {track_str}"
if vs_str:
main_info += f", {vs_str}"
response = f"{' '.join(response_parts)}: {main_info}"
# Truncate if too long (shouldn't happen, but safety check)
if len(response) > max_length:
response = response[:max_length-3] + "..."
return response
def _format_aircraft_list(
self,
aircraft_list: list[dict[str, Any]],
query_lat: float,
query_lon: float,
max_length: Optional[int] = None,
) -> str:
"""Format compact list for multiple aircraft.
Args:
aircraft_list: List of aircraft dictionaries.
query_lat: Query latitude.
query_lon: Query longitude.
Returns:
str: Formatted aircraft list.
"""
lines = []
for aircraft in aircraft_list:
callsign = aircraft.get('flight', '').strip() or aircraft.get('r', 'N/A')
alt = aircraft.get('alt_baro') or aircraft.get('alt_geom')
if isinstance(alt, str):
alt_str = alt
elif alt is not None:
alt_str = f"{int(alt)}ft"
else:
alt_str = "N/A"
gs = aircraft.get('gs')
speed_str = f"{int(gs)}kt" if gs is not None else "N/A"
distance_nm = aircraft.get('_distance_nm', 0)
bearing_cardinal = aircraft.get('_bearing_cardinal', 'N')
line = f"{callsign} {alt_str} {speed_str} {distance_nm:.1f}nm {bearing_cardinal}"
lines.append(line)
if max_length is None:
return '\n'.join(lines)
# Pack whole lines into the available single-message budget.
fitted_lines: list[str] = []
used = 0
for line in lines:
line_len = len(line)
projected = line_len if not fitted_lines else used + 1 + line_len
if projected > max_length:
break
fitted_lines.append(line)
used = projected
if not fitted_lines:
# Fall back to a hard-truncated first line for pathological edge cases.
first = lines[0] if lines else ""
return first[: max(0, max_length - 3)].rstrip() + "..." if len(first) > max_length else first
omitted = len(lines) - len(fitted_lines)
if omitted > 0:
suffix = f"\n...+{omitted} more"
if used + len(suffix) <= max_length:
return ''.join(['\n'.join(fitted_lines), suffix])
return '\n'.join(fitted_lines)
async def execute(self, message: MeshMessage) -> bool:
"""Execute the airplanes command.
Args:
message: The message triggering the command.
Returns:
bool: True if executed successfully, False otherwise.
"""
try:
content = message.content.strip()
parts = content.split()
if len(parts) < 1:
help_text = self.translate('commands.airplanes.usage')
await self.send_response(message, help_text)
return True
# Check if this is the "overhead" command
is_overhead_command = False
command_word = parts[0].lower()
# Check if command is "overhead" or "airplanes overhead" / "aircraft overhead" etc.
if command_word == 'overhead':
is_overhead_command = True
args = parts[1:] if len(parts) > 1 else []
elif len(parts) > 1 and parts[1].lower() == 'overhead':
# Handle "airplanes overhead" or "aircraft overhead" etc.
is_overhead_command = True
args = parts[2:] if len(parts) > 2 else []
else:
args = parts[1:] if len(parts) > 1 else []
# Parse location and filters
location = None
# For overhead command, only use companion location or specified coordinates
if is_overhead_command:
# Check if coordinates are provided
if args:
# Try joining first two args (handles "47.444356, -122.309483")
coords_str = ' '.join(args[:2])
coords = self._parse_coordinates(coords_str)
if coords:
location = coords
args = args[2:]
else:
# Try parsing just first arg as coordinates (lat,lon format like "47.444356,-122.309483")
coords = self._parse_coordinates(args[0])
if coords:
location = coords
args = args[1:]
elif len(args) >= 2:
# Handle case where first arg has trailing comma: "47.444356," + "-122.309483"
first_arg = args[0].rstrip(',')
second_arg = args[1]
coords_str = f"{first_arg},{second_arg}"
coords = self._parse_coordinates(coords_str)
if coords:
location = coords
args = args[2:]
# If no coordinates, try companion location only
if location is None:
location = self._get_companion_location(message)
if location:
pass
# If still no location, show specific error
if location is None:
error_msg = self.translate('commands.airplanes.overhead_no_location')
await self.send_response(message, error_msg)
return True
else:
# Regular airplanes command - check for "here" keyword
if args and args[0].lower() == 'here':
location = self._get_bot_location()
args = args[1:]
# Check if first arg is coordinates
elif args:
# Try joining first two args (handles "47.444356, -122.309483")
coords_str = ' '.join(args[:2])
coords = self._parse_coordinates(coords_str)
if coords:
location = coords
args = args[2:]
else:
# Try parsing just first arg as coordinates (lat,lon format like "47.444356,-122.309483")
coords = self._parse_coordinates(args[0])
if coords:
location = coords
args = args[1:]
elif len(args) >= 2:
# Handle case where first arg has trailing comma: "47.444356," + "-122.309483"
first_arg = args[0].rstrip(',')
second_arg = args[1]
coords_str = f"{first_arg},{second_arg}"
coords = self._parse_coordinates(coords_str)
if coords:
location = coords
args = args[2:]
# If no location specified, try companion then bot
if location is None:
location = self._get_companion_location(message)
if location:
pass
else:
location = self._get_bot_location()
if location:
pass
if location is None:
error_msg = self.translate('commands.airplanes.no_location')
await self.send_response(message, error_msg)
return True
# Parse filters
filters = self._parse_filters(args)
# For overhead command, force single closest aircraft
if is_overhead_command:
filters['limit'] = 1
filters['sort'] = 'distance'
# Fetch aircraft data
api_data = self._fetch_aircraft_data(location[0], location[1], filters['radius'])
if api_data is None:
error_msg = self.translate('commands.airplanes.api_error')
await self.send_response(message, error_msg)
return True
# Extract aircraft list (API uses 'ac' key, not 'aircraft')
aircraft_list = api_data.get('ac', api_data.get('aircraft', []))
if not aircraft_list:
error_msg = self.translate('commands.airplanes.no_aircraft', radius=filters['radius'])
await self.send_response(message, error_msg)
return True
# Filter and sort
filtered = self._filter_aircraft(aircraft_list, filters, location[0], location[1])
if not filtered:
error_msg = self.translate('commands.airplanes.no_aircraft_filtered', radius=filters['radius'])
await self.send_response(message, error_msg)
return True
# Get max message length for this message type
max_length = self.get_max_message_length(message)
# Format and send response
# For overhead command or single result, send as one message
if is_overhead_command or len(filtered) == 1:
response = self._format_single_aircraft(filtered[0], location[0], location[1], max_length)
await self.send_response(message, response)
else:
# Keep list output to a single frame to avoid multi-message flood traffic.
response = self._format_aircraft_list(
filtered,
location[0],
location[1],
max_length=max_length,
)
await self.send_response(message, response)
return True
except Exception as e:
self.logger.error(f"Error executing airplanes command: {e}")
error_msg = self.translate('commands.airplanes.error', error=str(e))
await self.send_response(message, error_msg)
return False
async def _send_split_response(self, message: MeshMessage, response: str, max_length: int):
"""Send response split into multiple messages if it exceeds max_length.
Args:
message: The message to respond to.
response: The full response text.
max_length: Maximum message length.
"""
lines = response.split('\n')
current_message = ""
message_count = 0
for _i, line in enumerate(lines):
# Check if adding this line would exceed max_length
if len(current_message) + len(line) + 1 > max_length: # +1 for newline
# Send current message and start new one
if current_message:
# Per-user rate limit applies only to first message (trigger); skip for continuations
await self.send_response(
message, current_message.rstrip(),
skip_user_rate_limit=(message_count > 0)
)
await asyncio.sleep(2.0) # Delay between messages
message_count += 1
# Start new message
current_message = line
else:
# Add line to current message
if current_message:
current_message += f"\n{line}"
else:
current_message = line
# Send the last message if there's content (continuation; skip per-user rate limit)
if current_message:
await self.send_response(message, current_message, skip_user_rate_limit=True)