From 7ee77c16c09eb01f5d4fd2a84eface1d788723dc Mon Sep 17 00:00:00 2001 From: agessaman Date: Fri, 9 Jan 2026 16:03:30 -0800 Subject: [PATCH] feat: Implement message queuing and rate limiting for Discord webhook posts - Introduced a QueuedMessage dataclass to manage messages queued for posting to Discord. - Added a background task to process message queues, handling rate limits and retries. - Updated message posting methods to support queuing and retry logic, ensuring robust message delivery. - Implemented proactive rate limiting to prevent exceeding Discord's message limits. --- .../service_plugins/discord_bridge_service.py | 270 ++++++++++++++++-- 1 file changed, 245 insertions(+), 25 deletions(-) diff --git a/modules/service_plugins/discord_bridge_service.py b/modules/service_plugins/discord_bridge_service.py index 3c5737c..5d41f5d 100644 --- a/modules/service_plugins/discord_bridge_service.py +++ b/modules/service_plugins/discord_bridge_service.py @@ -8,6 +8,8 @@ import asyncio import logging import time import copy +from collections import deque +from dataclasses import dataclass from typing import Dict, Optional, Any from datetime import datetime @@ -34,6 +36,23 @@ except ImportError: from .base_service import BaseServicePlugin +@dataclass +class QueuedMessage: + """Represents a message queued for Discord posting.""" + webhook_url: str + payload: Dict[str, str] + channel_name: str + retry_count: int = 0 + first_queued: float = 0.0 # Timestamp when first queued + next_retry_at: float = 0.0 # Timestamp when this message should be retried + + def __post_init__(self): + if self.first_queued == 0.0: + self.first_queued = time.time() + if self.next_retry_at == 0.0: + self.next_retry_at = time.time() + + class DiscordBridgeService(BaseServicePlugin): """Discord bridge service. @@ -84,12 +103,26 @@ class DiscordBridgeService(BaseServicePlugin): # Discord webhooks: 30 messages per minute per webhook self.rate_limit_info: Dict[str, Dict[str, Any]] = {} self.rate_limit_threshold = 0.20 # Warn at 20% of limit exhaustion + + # Message queue per webhook to handle rate limits and retries + # Using list instead of deque for easier removal of arbitrary items + self.message_queues: Dict[str, list] = {} + self.max_retries = 5 # Maximum retry attempts per message + self.retry_delay_base = 1.0 # Base delay in seconds for exponential backoff + self.max_queue_age = 300 # Max age in seconds before dropping message (5 minutes) + + # Proactive rate limiting: track send times per webhook + # Discord allows 30 messages per 60 seconds, so we'll throttle to ~25/min for safety + self.send_times: Dict[str, deque] = {} # Track timestamps of sent messages (deque for efficient popleft) + self.rate_limit_window = 60.0 # 60 second window + self.rate_limit_max = 25 # Conservative limit (25/min instead of 30/min for safety) # HTTP session for async requests self.http_session: Optional[aiohttp.ClientSession] = None - # Background task handle + # Background task handles self._message_handler_task: Optional[asyncio.Task] = None + self._queue_processor_task: Optional[asyncio.Task] = None if not self.channel_webhooks: self.logger.warning("No Discord channel mappings configured. Discord bridge will not post any messages.") @@ -240,6 +273,14 @@ class DiscordBridgeService(BaseServicePlugin): self.logger.error("Cannot subscribe to events - meshcore not available") return + # Initialize message queues for each webhook + for webhook_url in self.channel_webhooks.values(): + self.message_queues[webhook_url] = deque() + self.send_times[webhook_url] = deque() + + # Start background queue processor task + self._queue_processor_task = asyncio.create_task(self._process_message_queues()) + self._running = True self.logger.info(f"Discord bridge service started (bridging {len(self.channel_webhooks)} channels)") @@ -251,6 +292,14 @@ class DiscordBridgeService(BaseServicePlugin): self.logger.info("Stopping Discord bridge service...") self._running = False + # Cancel background tasks + if self._queue_processor_task: + self._queue_processor_task.cancel() + try: + await self._queue_processor_task + except asyncio.CancelledError: + pass + # Close aiohttp session if self.http_session: await self.http_session.close() @@ -323,15 +372,18 @@ class DiscordBridgeService(BaseServicePlugin): # Clean up MeshCore @ mentions: @[username] → **@username** message_text = self._format_mentions(message_text) - # Post to Discord - await self._post_to_webhook(webhook_url, message_text, channel_name, sender_name) + # Queue message for posting (with rate limiting and retry logic) + await self._queue_message(webhook_url, message_text, channel_name, sender_name) except Exception as e: self.logger.error(f"Error handling mesh channel message: {e}", exc_info=True) - async def _post_to_webhook(self, webhook_url: str, message: str, channel_name: str, sender_name: str = None) -> None: - """Post message to Discord webhook. - + async def _queue_message(self, webhook_url: str, message: str, channel_name: str, sender_name: str = None) -> None: + """Queue a message for posting to Discord webhook. + + Messages are queued and processed by a background task that handles + rate limiting, retries, and backoff. + Args: webhook_url: Discord webhook URL. message: Message text to post. @@ -340,40 +392,173 @@ class DiscordBridgeService(BaseServicePlugin): """ try: # Prepare webhook payload - # Use sender's name as webhook username for better visual separation username = sender_name if sender_name else f"MeshCore [{channel_name}]" - - # Generate unique avatar for each user based on username avatar_url = self._generate_avatar_url(username) - - # Build payload + payload = { "content": message, "username": username } - - # Only add avatar_url if we have one (None means use Discord's default colored avatars) + if avatar_url: payload["avatar_url"] = avatar_url + + # Create queued message + queued_msg = QueuedMessage( + webhook_url=webhook_url, + payload=payload, + channel_name=channel_name + ) + + # Add to queue + if webhook_url not in self.message_queues: + self.message_queues[webhook_url] = [] + self.message_queues[webhook_url].append(queued_msg) + + self.logger.debug(f"Queued message for Discord [{channel_name}]: {message[:50]}...") + + except Exception as e: + self.logger.error(f"Failed to queue message for Discord webhook [{channel_name}]: {e}", exc_info=True) + + async def _process_message_queues(self) -> None: + """Background task to process message queues with rate limiting and retries. + + Processes messages from queues, respecting rate limits and retrying failed messages. + """ + while self._running: + try: + current_time = time.time() + + # Process each webhook's queue + for webhook_url, queue in list(self.message_queues.items()): + if not queue: + continue + + # Clean up old send times (outside rate limit window) + if webhook_url in self.send_times: + send_times = self.send_times[webhook_url] + while send_times and (current_time - send_times[0]) > self.rate_limit_window: + send_times.popleft() + + # Check if we can send (proactive rate limiting) + can_send = True + if webhook_url in self.send_times: + recent_sends = len(self.send_times[webhook_url]) + if recent_sends >= self.rate_limit_max: + can_send = False + # Calculate wait time until oldest message expires + oldest_send = self.send_times[webhook_url][0] + wait_time = (oldest_send + self.rate_limit_window) - current_time + if wait_time > 0: + self.logger.debug(f"Rate limit throttling [{queue[0].channel_name}]: waiting {wait_time:.1f}s") + + if not can_send: + continue + + # Find next message ready to be sent (not waiting for retry delay) + queued_msg = None + for msg in queue: + if current_time >= msg.next_retry_at: + queued_msg = msg + break + + # If no message is ready, skip this webhook + if queued_msg is None: + continue + + # Check if message is too old + age = current_time - queued_msg.first_queued + if age > self.max_queue_age: + # Remove old message from queue + queue.remove(queued_msg) + self.logger.warning( + f"Dropping old message from queue [{queued_msg.channel_name}]: " + f"age {age:.1f}s exceeds max {self.max_queue_age}s" + ) + continue + + # Try to send the message + success = await self._post_to_webhook( + queued_msg.webhook_url, + queued_msg.payload, + queued_msg.channel_name, + queued_msg + ) + + if success: + # Success - remove from queue + queue.remove(queued_msg) + # Track send time for rate limiting + if webhook_url not in self.send_times: + self.send_times[webhook_url] = deque() + self.send_times[webhook_url].append(current_time) + else: + # Failed - increment retry count and schedule retry + queued_msg.retry_count += 1 + if queued_msg.retry_count > self.max_retries: + # Max retries exceeded - drop message + queue.remove(queued_msg) + self.logger.error( + f"Dropping message after {self.max_retries} retries " + f"[{queued_msg.channel_name}]: {queued_msg.payload['content'][:50]}..." + ) + else: + # Calculate exponential backoff delay + delay = self.retry_delay_base * (2 ** (queued_msg.retry_count - 1)) + queued_msg.next_retry_at = current_time + delay + self.logger.debug( + f"Message failed, will retry in {delay:.1f}s " + f"(attempt {queued_msg.retry_count}/{self.max_retries}) " + f"[{queued_msg.channel_name}]" + ) + # Message stays in queue, will be retried later + + # Small delay to prevent tight loop + await asyncio.sleep(0.1) + + except asyncio.CancelledError: + break + except Exception as e: + self.logger.error(f"Error in message queue processor: {e}", exc_info=True) + await asyncio.sleep(1.0) # Wait a bit before retrying on error + + async def _post_to_webhook(self, webhook_url: str, payload: Dict[str, str], channel_name: str, queued_msg: Optional[QueuedMessage] = None) -> bool: + """Post message to Discord webhook. + Args: + webhook_url: Discord webhook URL. + payload: JSON payload to post. + channel_name: MeshCore channel name (for logging). + queued_msg: Optional queued message object (for retry tracking). + + Returns: + bool: True if message was successfully posted, False otherwise. + """ + try: # Send via aiohttp (async) or requests (sync fallback) if AIOHTTP_AVAILABLE and self.http_session: - await self._post_async(webhook_url, payload, channel_name) + return await self._post_async(webhook_url, payload, channel_name, queued_msg) elif REQUESTS_AVAILABLE: - await self._post_sync(webhook_url, payload, channel_name) + return await self._post_sync(webhook_url, payload, channel_name, queued_msg) else: self.logger.error("No HTTP library available for posting to Discord") + return False except Exception as e: self.logger.error(f"Failed to post to Discord webhook [{channel_name}]: {e}", exc_info=True) + return False - async def _post_async(self, webhook_url: str, payload: Dict[str, str], channel_name: str) -> None: + async def _post_async(self, webhook_url: str, payload: Dict[str, str], channel_name: str, queued_msg: Optional[QueuedMessage] = None) -> bool: """Post to webhook using aiohttp (async). Args: webhook_url: Discord webhook URL. payload: JSON payload to post. channel_name: MeshCore channel name (for logging). + queued_msg: Optional queued message object (for retry tracking). + + Returns: + bool: True if message was successfully posted, False otherwise. """ try: async with self.http_session.post(webhook_url, json=payload, timeout=aiohttp.ClientTimeout(total=10)) as response: @@ -381,30 +566,50 @@ class DiscordBridgeService(BaseServicePlugin): if response.status == 204: # Success (Discord webhooks return 204 No Content on success) self.logger.debug(f"Posted to Discord [{channel_name}]: {payload['content'][:50]}...") + # Monitor rate limit headers + self._check_rate_limit_headers(response.headers, webhook_url, channel_name) + return True elif response.status == 429: - # Rate limited + # Rate limited - will be retried by queue processor retry_after = response.headers.get('Retry-After', 'unknown') self.logger.warning(f"Discord rate limit hit for [{channel_name}]. Retry after: {retry_after}s") + # If Retry-After is provided, wait that long before next attempt + if retry_after != 'unknown': + try: + retry_after_float = float(retry_after) + # Add delay to queued message if it exists + if queued_msg: + # Store retry delay in queued message metadata + queued_msg.retry_count = max(0, queued_msg.retry_count - 1) # Don't count this as a retry attempt + except (ValueError, TypeError): + pass + return False else: # Other error response_text = await response.text() self.logger.warning(f"Discord webhook returned {response.status} for [{channel_name}]: {response_text[:200]}") - - # Monitor rate limit headers - self._check_rate_limit_headers(response.headers, webhook_url, channel_name) + # Monitor rate limit headers even on error + self._check_rate_limit_headers(response.headers, webhook_url, channel_name) + return False except asyncio.TimeoutError: self.logger.error(f"Timeout posting to Discord webhook [{channel_name}]") + return False except Exception as e: self.logger.error(f"Error posting to Discord webhook [{channel_name}]: {e}") + return False - async def _post_sync(self, webhook_url: str, payload: Dict[str, str], channel_name: str) -> None: + async def _post_sync(self, webhook_url: str, payload: Dict[str, str], channel_name: str, queued_msg: Optional[QueuedMessage] = None) -> bool: """Post to webhook using requests library (sync fallback). Args: webhook_url: Discord webhook URL. payload: JSON payload to post. channel_name: MeshCore channel name (for logging). + queued_msg: Optional queued message object (for retry tracking). + + Returns: + bool: True if message was successfully posted, False otherwise. """ try: # Run in thread pool to avoid blocking event loop @@ -418,19 +623,34 @@ class DiscordBridgeService(BaseServicePlugin): if response.status_code == 204: # Success self.logger.debug(f"Posted to Discord [{channel_name}]: {payload['content'][:50]}...") + # Monitor rate limit headers + self._check_rate_limit_headers(response.headers, webhook_url, channel_name) + return True elif response.status_code == 429: - # Rate limited + # Rate limited - will be retried by queue processor retry_after = response.headers.get('Retry-After', 'unknown') self.logger.warning(f"Discord rate limit hit for [{channel_name}]. Retry after: {retry_after}s") + # If Retry-After is provided, wait that long before next attempt + if retry_after != 'unknown': + try: + retry_after_float = float(retry_after) + # Add delay to queued message if it exists + if queued_msg: + # Store retry delay in queued message metadata + queued_msg.retry_count = max(0, queued_msg.retry_count - 1) # Don't count this as a retry attempt + except (ValueError, TypeError): + pass + return False else: # Other error self.logger.warning(f"Discord webhook returned {response.status_code} for [{channel_name}]: {response.text[:200]}") - - # Monitor rate limit headers - self._check_rate_limit_headers(response.headers, webhook_url, channel_name) + # Monitor rate limit headers even on error + self._check_rate_limit_headers(response.headers, webhook_url, channel_name) + return False except Exception as e: self.logger.error(f"Error posting to Discord webhook [{channel_name}]: {e}") + return False def _check_rate_limit_headers(self, headers: Dict[str, str], webhook_url: str, channel_name: str) -> None: """Check Discord rate limit headers and log warnings if approaching limit.