feat: Implement message queuing and rate limiting for Discord webhook posts

- Introduced a QueuedMessage dataclass to manage messages queued for posting to Discord.
- Added a background task to process message queues, handling rate limits and retries.
- Updated message posting methods to support queuing and retry logic, ensuring robust message delivery.
- Implemented proactive rate limiting to prevent exceeding Discord's message limits.
This commit is contained in:
agessaman
2026-01-09 16:03:30 -08:00
parent ea30227a73
commit 7ee77c16c0
+245 -25
View File
@@ -8,6 +8,8 @@ import asyncio
import logging
import time
import copy
from collections import deque
from dataclasses import dataclass
from typing import Dict, Optional, Any
from datetime import datetime
@@ -34,6 +36,23 @@ except ImportError:
from .base_service import BaseServicePlugin
@dataclass
class QueuedMessage:
"""Represents a message queued for Discord posting."""
webhook_url: str
payload: Dict[str, str]
channel_name: str
retry_count: int = 0
first_queued: float = 0.0 # Timestamp when first queued
next_retry_at: float = 0.0 # Timestamp when this message should be retried
def __post_init__(self):
if self.first_queued == 0.0:
self.first_queued = time.time()
if self.next_retry_at == 0.0:
self.next_retry_at = time.time()
class DiscordBridgeService(BaseServicePlugin):
"""Discord bridge service.
@@ -84,12 +103,26 @@ class DiscordBridgeService(BaseServicePlugin):
# Discord webhooks: 30 messages per minute per webhook
self.rate_limit_info: Dict[str, Dict[str, Any]] = {}
self.rate_limit_threshold = 0.20 # Warn at 20% of limit exhaustion
# Message queue per webhook to handle rate limits and retries
# Using list instead of deque for easier removal of arbitrary items
self.message_queues: Dict[str, list] = {}
self.max_retries = 5 # Maximum retry attempts per message
self.retry_delay_base = 1.0 # Base delay in seconds for exponential backoff
self.max_queue_age = 300 # Max age in seconds before dropping message (5 minutes)
# Proactive rate limiting: track send times per webhook
# Discord allows 30 messages per 60 seconds, so we'll throttle to ~25/min for safety
self.send_times: Dict[str, deque] = {} # Track timestamps of sent messages (deque for efficient popleft)
self.rate_limit_window = 60.0 # 60 second window
self.rate_limit_max = 25 # Conservative limit (25/min instead of 30/min for safety)
# HTTP session for async requests
self.http_session: Optional[aiohttp.ClientSession] = None
# Background task handle
# Background task handles
self._message_handler_task: Optional[asyncio.Task] = None
self._queue_processor_task: Optional[asyncio.Task] = None
if not self.channel_webhooks:
self.logger.warning("No Discord channel mappings configured. Discord bridge will not post any messages.")
@@ -240,6 +273,14 @@ class DiscordBridgeService(BaseServicePlugin):
self.logger.error("Cannot subscribe to events - meshcore not available")
return
# Initialize message queues for each webhook
for webhook_url in self.channel_webhooks.values():
self.message_queues[webhook_url] = deque()
self.send_times[webhook_url] = deque()
# Start background queue processor task
self._queue_processor_task = asyncio.create_task(self._process_message_queues())
self._running = True
self.logger.info(f"Discord bridge service started (bridging {len(self.channel_webhooks)} channels)")
@@ -251,6 +292,14 @@ class DiscordBridgeService(BaseServicePlugin):
self.logger.info("Stopping Discord bridge service...")
self._running = False
# Cancel background tasks
if self._queue_processor_task:
self._queue_processor_task.cancel()
try:
await self._queue_processor_task
except asyncio.CancelledError:
pass
# Close aiohttp session
if self.http_session:
await self.http_session.close()
@@ -323,15 +372,18 @@ class DiscordBridgeService(BaseServicePlugin):
# Clean up MeshCore @ mentions: @[username] → **@username**
message_text = self._format_mentions(message_text)
# Post to Discord
await self._post_to_webhook(webhook_url, message_text, channel_name, sender_name)
# Queue message for posting (with rate limiting and retry logic)
await self._queue_message(webhook_url, message_text, channel_name, sender_name)
except Exception as e:
self.logger.error(f"Error handling mesh channel message: {e}", exc_info=True)
async def _post_to_webhook(self, webhook_url: str, message: str, channel_name: str, sender_name: str = None) -> None:
"""Post message to Discord webhook.
async def _queue_message(self, webhook_url: str, message: str, channel_name: str, sender_name: str = None) -> None:
"""Queue a message for posting to Discord webhook.
Messages are queued and processed by a background task that handles
rate limiting, retries, and backoff.
Args:
webhook_url: Discord webhook URL.
message: Message text to post.
@@ -340,40 +392,173 @@ class DiscordBridgeService(BaseServicePlugin):
"""
try:
# Prepare webhook payload
# Use sender's name as webhook username for better visual separation
username = sender_name if sender_name else f"MeshCore [{channel_name}]"
# Generate unique avatar for each user based on username
avatar_url = self._generate_avatar_url(username)
# Build payload
payload = {
"content": message,
"username": username
}
# Only add avatar_url if we have one (None means use Discord's default colored avatars)
if avatar_url:
payload["avatar_url"] = avatar_url
# Create queued message
queued_msg = QueuedMessage(
webhook_url=webhook_url,
payload=payload,
channel_name=channel_name
)
# Add to queue
if webhook_url not in self.message_queues:
self.message_queues[webhook_url] = []
self.message_queues[webhook_url].append(queued_msg)
self.logger.debug(f"Queued message for Discord [{channel_name}]: {message[:50]}...")
except Exception as e:
self.logger.error(f"Failed to queue message for Discord webhook [{channel_name}]: {e}", exc_info=True)
async def _process_message_queues(self) -> None:
"""Background task to process message queues with rate limiting and retries.
Processes messages from queues, respecting rate limits and retrying failed messages.
"""
while self._running:
try:
current_time = time.time()
# Process each webhook's queue
for webhook_url, queue in list(self.message_queues.items()):
if not queue:
continue
# Clean up old send times (outside rate limit window)
if webhook_url in self.send_times:
send_times = self.send_times[webhook_url]
while send_times and (current_time - send_times[0]) > self.rate_limit_window:
send_times.popleft()
# Check if we can send (proactive rate limiting)
can_send = True
if webhook_url in self.send_times:
recent_sends = len(self.send_times[webhook_url])
if recent_sends >= self.rate_limit_max:
can_send = False
# Calculate wait time until oldest message expires
oldest_send = self.send_times[webhook_url][0]
wait_time = (oldest_send + self.rate_limit_window) - current_time
if wait_time > 0:
self.logger.debug(f"Rate limit throttling [{queue[0].channel_name}]: waiting {wait_time:.1f}s")
if not can_send:
continue
# Find next message ready to be sent (not waiting for retry delay)
queued_msg = None
for msg in queue:
if current_time >= msg.next_retry_at:
queued_msg = msg
break
# If no message is ready, skip this webhook
if queued_msg is None:
continue
# Check if message is too old
age = current_time - queued_msg.first_queued
if age > self.max_queue_age:
# Remove old message from queue
queue.remove(queued_msg)
self.logger.warning(
f"Dropping old message from queue [{queued_msg.channel_name}]: "
f"age {age:.1f}s exceeds max {self.max_queue_age}s"
)
continue
# Try to send the message
success = await self._post_to_webhook(
queued_msg.webhook_url,
queued_msg.payload,
queued_msg.channel_name,
queued_msg
)
if success:
# Success - remove from queue
queue.remove(queued_msg)
# Track send time for rate limiting
if webhook_url not in self.send_times:
self.send_times[webhook_url] = deque()
self.send_times[webhook_url].append(current_time)
else:
# Failed - increment retry count and schedule retry
queued_msg.retry_count += 1
if queued_msg.retry_count > self.max_retries:
# Max retries exceeded - drop message
queue.remove(queued_msg)
self.logger.error(
f"Dropping message after {self.max_retries} retries "
f"[{queued_msg.channel_name}]: {queued_msg.payload['content'][:50]}..."
)
else:
# Calculate exponential backoff delay
delay = self.retry_delay_base * (2 ** (queued_msg.retry_count - 1))
queued_msg.next_retry_at = current_time + delay
self.logger.debug(
f"Message failed, will retry in {delay:.1f}s "
f"(attempt {queued_msg.retry_count}/{self.max_retries}) "
f"[{queued_msg.channel_name}]"
)
# Message stays in queue, will be retried later
# Small delay to prevent tight loop
await asyncio.sleep(0.1)
except asyncio.CancelledError:
break
except Exception as e:
self.logger.error(f"Error in message queue processor: {e}", exc_info=True)
await asyncio.sleep(1.0) # Wait a bit before retrying on error
async def _post_to_webhook(self, webhook_url: str, payload: Dict[str, str], channel_name: str, queued_msg: Optional[QueuedMessage] = None) -> bool:
"""Post message to Discord webhook.
Args:
webhook_url: Discord webhook URL.
payload: JSON payload to post.
channel_name: MeshCore channel name (for logging).
queued_msg: Optional queued message object (for retry tracking).
Returns:
bool: True if message was successfully posted, False otherwise.
"""
try:
# Send via aiohttp (async) or requests (sync fallback)
if AIOHTTP_AVAILABLE and self.http_session:
await self._post_async(webhook_url, payload, channel_name)
return await self._post_async(webhook_url, payload, channel_name, queued_msg)
elif REQUESTS_AVAILABLE:
await self._post_sync(webhook_url, payload, channel_name)
return await self._post_sync(webhook_url, payload, channel_name, queued_msg)
else:
self.logger.error("No HTTP library available for posting to Discord")
return False
except Exception as e:
self.logger.error(f"Failed to post to Discord webhook [{channel_name}]: {e}", exc_info=True)
return False
async def _post_async(self, webhook_url: str, payload: Dict[str, str], channel_name: str) -> None:
async def _post_async(self, webhook_url: str, payload: Dict[str, str], channel_name: str, queued_msg: Optional[QueuedMessage] = None) -> bool:
"""Post to webhook using aiohttp (async).
Args:
webhook_url: Discord webhook URL.
payload: JSON payload to post.
channel_name: MeshCore channel name (for logging).
queued_msg: Optional queued message object (for retry tracking).
Returns:
bool: True if message was successfully posted, False otherwise.
"""
try:
async with self.http_session.post(webhook_url, json=payload, timeout=aiohttp.ClientTimeout(total=10)) as response:
@@ -381,30 +566,50 @@ class DiscordBridgeService(BaseServicePlugin):
if response.status == 204:
# Success (Discord webhooks return 204 No Content on success)
self.logger.debug(f"Posted to Discord [{channel_name}]: {payload['content'][:50]}...")
# Monitor rate limit headers
self._check_rate_limit_headers(response.headers, webhook_url, channel_name)
return True
elif response.status == 429:
# Rate limited
# Rate limited - will be retried by queue processor
retry_after = response.headers.get('Retry-After', 'unknown')
self.logger.warning(f"Discord rate limit hit for [{channel_name}]. Retry after: {retry_after}s")
# If Retry-After is provided, wait that long before next attempt
if retry_after != 'unknown':
try:
retry_after_float = float(retry_after)
# Add delay to queued message if it exists
if queued_msg:
# Store retry delay in queued message metadata
queued_msg.retry_count = max(0, queued_msg.retry_count - 1) # Don't count this as a retry attempt
except (ValueError, TypeError):
pass
return False
else:
# Other error
response_text = await response.text()
self.logger.warning(f"Discord webhook returned {response.status} for [{channel_name}]: {response_text[:200]}")
# Monitor rate limit headers
self._check_rate_limit_headers(response.headers, webhook_url, channel_name)
# Monitor rate limit headers even on error
self._check_rate_limit_headers(response.headers, webhook_url, channel_name)
return False
except asyncio.TimeoutError:
self.logger.error(f"Timeout posting to Discord webhook [{channel_name}]")
return False
except Exception as e:
self.logger.error(f"Error posting to Discord webhook [{channel_name}]: {e}")
return False
async def _post_sync(self, webhook_url: str, payload: Dict[str, str], channel_name: str) -> None:
async def _post_sync(self, webhook_url: str, payload: Dict[str, str], channel_name: str, queued_msg: Optional[QueuedMessage] = None) -> bool:
"""Post to webhook using requests library (sync fallback).
Args:
webhook_url: Discord webhook URL.
payload: JSON payload to post.
channel_name: MeshCore channel name (for logging).
queued_msg: Optional queued message object (for retry tracking).
Returns:
bool: True if message was successfully posted, False otherwise.
"""
try:
# Run in thread pool to avoid blocking event loop
@@ -418,19 +623,34 @@ class DiscordBridgeService(BaseServicePlugin):
if response.status_code == 204:
# Success
self.logger.debug(f"Posted to Discord [{channel_name}]: {payload['content'][:50]}...")
# Monitor rate limit headers
self._check_rate_limit_headers(response.headers, webhook_url, channel_name)
return True
elif response.status_code == 429:
# Rate limited
# Rate limited - will be retried by queue processor
retry_after = response.headers.get('Retry-After', 'unknown')
self.logger.warning(f"Discord rate limit hit for [{channel_name}]. Retry after: {retry_after}s")
# If Retry-After is provided, wait that long before next attempt
if retry_after != 'unknown':
try:
retry_after_float = float(retry_after)
# Add delay to queued message if it exists
if queued_msg:
# Store retry delay in queued message metadata
queued_msg.retry_count = max(0, queued_msg.retry_count - 1) # Don't count this as a retry attempt
except (ValueError, TypeError):
pass
return False
else:
# Other error
self.logger.warning(f"Discord webhook returned {response.status_code} for [{channel_name}]: {response.text[:200]}")
# Monitor rate limit headers
self._check_rate_limit_headers(response.headers, webhook_url, channel_name)
# Monitor rate limit headers even on error
self._check_rate_limit_headers(response.headers, webhook_url, channel_name)
return False
except Exception as e:
self.logger.error(f"Error posting to Discord webhook [{channel_name}]: {e}")
return False
def _check_rate_limit_headers(self, headers: Dict[str, str], webhook_url: str, channel_name: str) -> None:
"""Check Discord rate limit headers and log warnings if approaching limit.