From cebf2cbc8bb80b2236c77cd19b8d87d76c37737d Mon Sep 17 00:00:00 2001 From: agessaman Date: Sun, 22 Mar 2026 13:16:00 -0700 Subject: [PATCH] Implement locking mechanism for message processing in FeedManager - Introduced a lock to serialize access to the message processing queue, preventing concurrent executions that could lead to race conditions. - Updated the process_message_queue method to utilize the new lock, ensuring safe access to the inner processing logic. - Increased the timeout for awaiting feed message queue processing in the MessageScheduler to 600 seconds, accommodating longer processing times for queued items. --- modules/feed_manager.py | 10 ++++++++++ modules/scheduler.py | 11 ++++++++++- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/modules/feed_manager.py b/modules/feed_manager.py index 0f8ed83..31434ce 100644 --- a/modules/feed_manager.py +++ b/modules/feed_manager.py @@ -66,6 +66,9 @@ class FeedManager: # Semaphore to limit concurrent requests self._request_semaphore = asyncio.Semaphore(5) + # Serialize process_message_queue (scheduler may schedule another run if result() times out) + self._process_queue_lock: Optional[asyncio.Lock] = None + self.logger.info("FeedManager initialized") async def initialize(self): @@ -1214,6 +1217,13 @@ class FeedManager: async def process_message_queue(self): """Process queued feed messages and send them at configured intervals""" + if self._process_queue_lock is None: + self._process_queue_lock = asyncio.Lock() + async with self._process_queue_lock: + await self._process_message_queue_inner() + + async def _process_message_queue_inner(self): + """Body of process_message_queue (runs under _process_queue_lock).""" try: # Get all unsent messages, ordered by priority and queue time db_path = str(self.db_path) # Ensure string, not Path object diff --git a/modules/scheduler.py b/modules/scheduler.py index 3894e11..25efff0 100644 --- a/modules/scheduler.py +++ b/modules/scheduler.py @@ -15,6 +15,9 @@ from typing import Dict, Tuple, Any from pathlib import Path from .utils import decode_escape_sequences, format_keyword_response_with_placeholders, get_config_timezone +# process_message_queue may await long per-feed intervals across many queued items; 30s is too short. +_FEED_MESSAGE_QUEUE_FUTURE_TIMEOUT = 600 + class MessageScheduler: """Manages scheduled messages and timing""" @@ -420,7 +423,13 @@ class MessageScheduler: self.bot.main_event_loop ) try: - future.result(timeout=30) # 30 second timeout + future.result(timeout=_FEED_MESSAGE_QUEUE_FUTURE_TIMEOUT) + except TimeoutError: + self.logger.warning( + "Timed out waiting for feed message queue after %ss; " + "work may still be running on the main loop (per-feed send spacing).", + _FEED_MESSAGE_QUEUE_FUTURE_TIMEOUT, + ) except Exception as e: self.logger.exception(f"Error processing message queue: {e}") else: