diff --git a/modules/feed_manager.py b/modules/feed_manager.py index 0f8ed83..31434ce 100644 --- a/modules/feed_manager.py +++ b/modules/feed_manager.py @@ -66,6 +66,9 @@ class FeedManager: # Semaphore to limit concurrent requests self._request_semaphore = asyncio.Semaphore(5) + # Serialize process_message_queue (scheduler may schedule another run if result() times out) + self._process_queue_lock: Optional[asyncio.Lock] = None + self.logger.info("FeedManager initialized") async def initialize(self): @@ -1214,6 +1217,13 @@ class FeedManager: async def process_message_queue(self): """Process queued feed messages and send them at configured intervals""" + if self._process_queue_lock is None: + self._process_queue_lock = asyncio.Lock() + async with self._process_queue_lock: + await self._process_message_queue_inner() + + async def _process_message_queue_inner(self): + """Body of process_message_queue (runs under _process_queue_lock).""" try: # Get all unsent messages, ordered by priority and queue time db_path = str(self.db_path) # Ensure string, not Path object diff --git a/modules/scheduler.py b/modules/scheduler.py index 3894e11..25efff0 100644 --- a/modules/scheduler.py +++ b/modules/scheduler.py @@ -15,6 +15,9 @@ from typing import Dict, Tuple, Any from pathlib import Path from .utils import decode_escape_sequences, format_keyword_response_with_placeholders, get_config_timezone +# process_message_queue may await long per-feed intervals across many queued items; 30s is too short. +_FEED_MESSAGE_QUEUE_FUTURE_TIMEOUT = 600 + class MessageScheduler: """Manages scheduled messages and timing""" @@ -420,7 +423,13 @@ class MessageScheduler: self.bot.main_event_loop ) try: - future.result(timeout=30) # 30 second timeout + future.result(timeout=_FEED_MESSAGE_QUEUE_FUTURE_TIMEOUT) + except TimeoutError: + self.logger.warning( + "Timed out waiting for feed message queue after %ss; " + "work may still be running on the main loop (per-feed send spacing).", + _FEED_MESSAGE_QUEUE_FUTURE_TIMEOUT, + ) except Exception as e: self.logger.exception(f"Error processing message queue: {e}") else: