From 614fa2b163326a0c86b83394db604424c0fb4d4c Mon Sep 17 00:00:00 2001 From: Evgeny Date: Thu, 20 Mar 2025 08:57:47 +0000 Subject: [PATCH] smp server: reduce queue expiration/idle intervals, skip expiring very old queues (#1488) * smp server: reduce idle queue interval and queue expiration interval * only expire recent queues (not very old) * fix * version --- src/Simplex/Messaging/Server.hs | 27 ++------------- src/Simplex/Messaging/Server/Env/STM.hs | 4 +-- src/Simplex/Messaging/Server/Main.hs | 2 +- .../Messaging/Server/MsgStore/Journal.hs | 34 +++++++++++-------- src/Simplex/Messaging/Server/MsgStore/STM.hs | 11 +++--- .../Messaging/Server/MsgStore/Types.hs | 34 +++++++++++++------ .../Messaging/Server/QueueStore/Postgres.hs | 9 +++-- .../Server/QueueStore/Postgres/Migrations.hs | 19 ++++++++++- tests/CoreTests/MsgStoreTests.hs | 4 +-- 9 files changed, 81 insertions(+), 63 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 00dc7cba7..e422fef84 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -147,23 +147,6 @@ runSMPServerBlocking started cfg attachHTTP_ = newEnv cfg >>= runReaderT (smpSer type M a = ReaderT Env IO a type AttachHTTP = Socket -> TLS.Context -> IO () -data MessageStats = MessageStats - { storedMsgsCount :: Int, - expiredMsgsCount :: Int, - storedQueues :: Int - } - -instance Monoid MessageStats where - mempty = MessageStats 0 0 0 - {-# INLINE mempty #-} - -instance Semigroup MessageStats where - MessageStats a b c <> MessageStats x y z = MessageStats (a + x) (b + y) (c + z) - {-# INLINE (<>) #-} - -newMessageStats :: MessageStats -newMessageStats = MessageStats 0 0 0 - smpServer :: TMVar Bool -> ServerConfig -> Maybe AttachHTTP -> M () smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOptions} attachHTTP_ = do s <- asks server @@ -389,9 +372,9 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt expireMessagesThread_ _ = [] expireMessagesThread :: ExpirationConfig -> M () - expireMessagesThread expCfg = do + expireMessagesThread ExpirationConfig {checkInterval, ttl} = do AMS _ _ ms <- asks msgStore - let interval = checkInterval expCfg * 1000000 + let interval = checkInterval * 1000000 stats <- asks serverStats labelMyThread "expireMessagesThread" liftIO $ forever $ expire ms stats interval @@ -402,17 +385,13 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt logInfo "Started expiring messages..." n <- compactQueues @(StoreQueue s) $ queueStore ms when (n > 0) $ logInfo $ "Removed " <> tshow n <> " old deleted queues from the database." - old <- expireBeforeEpoch expCfg now <- systemSeconds <$> getSystemTime - tryAny (withAllMsgQueues False "idleDeleteExpiredMsgs" ms $ expireQueueMsgs now ms old) >>= \case + tryAny (expireOldMessages False ms now ttl) >>= \case Right msgStats@MessageStats {storedMsgsCount = stored, expiredMsgsCount = expired} -> do atomicWriteIORef (msgCount stats) stored atomicModifyIORef'_ (msgExpired stats) (+ expired) printMessageStats "STORE: messages" msgStats Left e -> logError $ "STORE: withAllMsgQueues, error expiring messages, " <> tshow e - expireQueueMsgs now ms old q = do - (expired_, stored) <- idleDeleteExpiredMsgs now ms q old - pure MessageStats {storedMsgsCount = stored, expiredMsgsCount = fromMaybe 0 expired_, storedQueues = 1} expireNtfsThread :: ServerConfig -> M () expireNtfsThread ServerConfig {notificationExpiration = expCfg} = do diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 160697feb..c1c72a39e 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -147,11 +147,11 @@ defaultMessageExpiration :: ExpirationConfig defaultMessageExpiration = ExpirationConfig { ttl = defMsgExpirationDays * 86400, -- seconds - checkInterval = 14400 -- seconds, 4 hours + checkInterval = 7200 -- seconds, 2 hours } defaultIdleQueueInterval :: Int64 -defaultIdleQueueInterval = 28800 -- seconds, 8 hours +defaultIdleQueueInterval = 14400 -- seconds, 4 hours defNtfExpirationHours :: Int64 defNtfExpirationHours = 24 diff --git a/src/Simplex/Messaging/Server/Main.hs b/src/Simplex/Messaging/Server/Main.hs index 6c930ba0b..83e59243d 100644 --- a/src/Simplex/Messaging/Server/Main.hs +++ b/src/Simplex/Messaging/Server/Main.hs @@ -210,7 +210,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = let storeCfg = PostgresStoreCfg {dbOpts, dbStoreLogPath = Nothing, confirmMigrations = MCConsole, deletedTTL = iniDeletedTTL ini} ps <- newJournalMsgStore $ PQStoreCfg storeCfg sl <- openWriteStoreLog False storeLogFilePath - Sum qCnt <- foldQueueRecs True (postgresQueueStore ps) $ \(rId, qr) -> logCreateQueue sl rId qr $> Sum (1 :: Int) + Sum qCnt <- foldQueueRecs True (postgresQueueStore ps) Nothing $ \(rId, qr) -> logCreateQueue sl rId qr $> Sum (1 :: Int) putStrLn $ "Export completed: " <> show qCnt <> " queues" putStrLn $ case readStoreType ini of Right (ASType SQSPostgres SMSJournal) -> "store_queues set to `database`, update it to `memory` in INI file." diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index 0382089a8..0686a71bf 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -393,7 +393,7 @@ instance MsgStoreClass (JournalMsgStore s) where unsafeWithAllMsgQueues tty ms action = case queueStore_ ms of MQStore st -> withLoadedQueues st run #if defined(dbServerPostgres) - PQStore st -> foldQueueRecs tty st $ uncurry (mkTempQueue ms) >=> run + PQStore st -> foldQueueRecs tty st Nothing $ uncurry (mkTempQueue ms) >=> run #endif where run q = do @@ -401,23 +401,32 @@ instance MsgStoreClass (JournalMsgStore s) where closeMsgQueue q pure r - -- This function is concurrency safe, it is used to expire queues. - withAllMsgQueues :: forall a. Monoid a => Bool -> String -> JournalMsgStore s -> (JournalQueue s -> StoreIO s a) -> IO a - withAllMsgQueues tty op ms action = case queueStore_ ms of + -- This function is concurrency safe + expireOldMessages :: Bool -> JournalMsgStore s -> Int64 -> Int64 -> IO MessageStats + expireOldMessages tty ms now ttl = case queueStore_ ms of MQStore st -> - withLoadedQueues st $ \q -> - run $ isolateQueue q op $ action q + withLoadedQueues st $ \q -> run $ isolateQueue q "deleteExpiredMsgs" $ do + StoreIO (readTVarIO $ queueRec q) >>= \case + Just QueueRec {updatedAt = Just (RoundedSystemTime t)} | t > veryOld -> + expireQueueMsgs ms now old q + _ -> pure newMessageStats #if defined(dbServerPostgres) PQStore st -> do let JournalMsgStore {queueLocks, sharedLock} = ms - foldQueueRecs tty st $ \(rId, qr) -> do + foldQueueRecs tty st (Just veryOld) $ \(rId, qr) -> do q <- mkTempQueue ms rId qr - withSharedWaitLock rId queueLocks sharedLock $ - run $ tryStore' op rId $ unStoreIO $ action q + withSharedWaitLock rId queueLocks sharedLock $ run $ tryStore' "deleteExpiredMsgs" rId $ + getLoadedQueue q >>= unStoreIO . expireQueueMsgs ms now old #endif where - run :: ExceptT ErrorType IO a -> IO a - run = fmap (fromRight mempty) . runExceptT + old = now - ttl + veryOld = now - 2 * ttl - 86400 + run :: ExceptT ErrorType IO MessageStats -> IO MessageStats + run = fmap (fromRight newMessageStats) . runExceptT + -- Use cached queue if available. + -- Also see the comment in loadQueue in PostgresQueueStore + getLoadedQueue :: JournalQueue s -> IO (JournalQueue s) + getLoadedQueue q = fromMaybe q <$> TM.lookupIO (recipientId q) (loadedQueues $ queueStore_ ms) logQueueStates :: JournalMsgStore s -> IO () logQueueStates ms = withActiveMsgQueues ms $ unStoreIO . logQueueState @@ -437,9 +446,6 @@ instance MsgStoreClass (JournalMsgStore s) where lock <- atomically $ getMapLock (queueLocks ms) rId makeQueue_ ms rId qr lock - getLoadedQueue :: JournalMsgStore s -> JournalQueue s -> StoreIO s (JournalQueue s) - getLoadedQueue ms sq = StoreIO $ fromMaybe sq <$> TM.lookupIO (recipientId sq) (loadedQueues $ queueStore_ ms) - getMsgQueue :: JournalMsgStore s -> JournalQueue s -> Bool -> StoreIO s (JournalMsgQueue s) getMsgQueue ms@JournalMsgStore {random} q'@JournalQueue {recipientId' = rId, msgQueue'} forWrite = StoreIO $ readTVarIO msgQueue' >>= maybe newQ pure diff --git a/src/Simplex/Messaging/Server/MsgStore/STM.hs b/src/Simplex/Messaging/Server/MsgStore/STM.hs index 43a41d7ca..6fa94fd03 100644 --- a/src/Simplex/Messaging/Server/MsgStore/STM.hs +++ b/src/Simplex/Messaging/Server/MsgStore/STM.hs @@ -82,8 +82,11 @@ instance MsgStoreClass STMMsgStore where {-# INLINE withActiveMsgQueues #-} unsafeWithAllMsgQueues _ = withLoadedQueues . queueStore_ {-# INLINE unsafeWithAllMsgQueues #-} - withAllMsgQueues _tty _op ms action = withLoadedQueues (queueStore_ ms) $ atomically . action - {-# INLINE withAllMsgQueues #-} + + expireOldMessages :: Bool -> STMMsgStore -> Int64 -> Int64 -> IO MessageStats + expireOldMessages _tty ms now ttl = + withLoadedQueues (queueStore_ ms) $ atomically . expireQueueMsgs ms now (now - ttl) + logQueueStates _ = pure () {-# INLINE logQueueStates #-} logQueueState _ = pure () @@ -94,10 +97,6 @@ instance MsgStoreClass STMMsgStore where mkQueue _ rId qr = STMQueue rId <$> newTVarIO (Just qr) <*> newTVarIO Nothing {-# INLINE mkQueue #-} - getLoadedQueue :: STMMsgStore -> STMQueue -> STM STMQueue - getLoadedQueue _ = pure - {-# INLINE getLoadedQueue #-} - getMsgQueue :: STMMsgStore -> STMQueue -> Bool -> STM STMMsgQueue getMsgQueue _ STMQueue {msgQueue'} _ = readTVar msgQueue' >>= maybe newQ pure where diff --git a/src/Simplex/Messaging/Server/MsgStore/Types.hs b/src/Simplex/Messaging/Server/MsgStore/Types.hs index 514b67d7b..01dfdb88c 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Types.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Types.hs @@ -21,6 +21,7 @@ import Control.Monad.Trans.Except import Data.Functor (($>)) import Data.Int (Int64) import Data.Kind +import Data.Maybe (fromMaybe) import Data.Time.Clock.System (SystemTime (systemSeconds)) import Simplex.Messaging.Protocol import Simplex.Messaging.Server.QueueStore @@ -37,14 +38,14 @@ class (Monad (StoreMonad s), QueueStoreClass (StoreQueue s) (QueueStore s)) => M withActiveMsgQueues :: Monoid a => s -> (StoreQueue s -> IO a) -> IO a -- This function can only be used in server CLI commands or before server is started. unsafeWithAllMsgQueues :: Monoid a => Bool -> s -> (StoreQueue s -> IO a) -> IO a - withAllMsgQueues :: Monoid a => Bool -> String -> s -> (StoreQueue s -> StoreMonad s a) -> IO a + -- tty, store, now, ttl + expireOldMessages :: Bool -> s -> Int64 -> Int64 -> IO MessageStats logQueueStates :: s -> IO () logQueueState :: StoreQueue s -> StoreMonad s () queueStore :: s -> QueueStore s -- message store methods mkQueue :: s -> RecipientId -> QueueRec -> IO (StoreQueue s) - getLoadedQueue :: s -> StoreQueue s -> StoreMonad s (StoreQueue s) getMsgQueue :: s -> StoreQueue s -> Bool -> StoreMonad s (MsgQueue (StoreQueue s)) getPeekMsgQueue :: s -> StoreQueue s -> StoreMonad s (Maybe (MsgQueue (StoreQueue s), Message)) @@ -73,6 +74,23 @@ data SQSType :: QSType -> Type where SQSMemory :: SQSType 'QSMemory SQSPostgres :: SQSType 'QSPostgres +data MessageStats = MessageStats + { storedMsgsCount :: Int, + expiredMsgsCount :: Int, + storedQueues :: Int + } + +instance Monoid MessageStats where + mempty = MessageStats 0 0 0 + {-# INLINE mempty #-} + +instance Semigroup MessageStats where + MessageStats a b c <> MessageStats x y z = MessageStats (a + x) (b + y) (c + z) + {-# INLINE (<>) #-} + +newMessageStats :: MessageStats +newMessageStats = MessageStats 0 0 0 + addQueue :: MsgStoreClass s => s -> RecipientId -> QueueRec -> IO (Either ErrorType (StoreQueue s)) addQueue st = addQueue_ (queueStore st) (mkQueue st) {-# INLINE addQueue #-} @@ -122,14 +140,10 @@ deleteExpiredMsgs st q old = isolateQueue q "deleteExpiredMsgs" $ getMsgQueue st q False >>= deleteExpireMsgs_ old q --- closed and idle queues will be closed after expiration --- returns (expired count, queue size after expiration) -idleDeleteExpiredMsgs :: MsgStoreClass s => Int64 -> s -> StoreQueue s -> Int64 -> StoreMonad s (Maybe Int, Int) -idleDeleteExpiredMsgs now st q old = do - -- Use cached queue if available. - -- Also see the comment in loadQueue in PostgresQueueStore - q' <- getLoadedQueue st q - withIdleMsgQueue now st q' $ deleteExpireMsgs_ old q' +expireQueueMsgs :: MsgStoreClass s => s -> Int64 -> Int64 -> StoreQueue s -> StoreMonad s MessageStats +expireQueueMsgs st now old q = do + (expired_, stored) <- withIdleMsgQueue now st q $ deleteExpireMsgs_ old q + pure MessageStats {storedMsgsCount = stored, expiredMsgsCount = fromMaybe 0 expired_, storedQueues = 1} deleteExpireMsgs_ :: MsgStoreClass s => Int64 -> StoreQueue s -> MsgQueue (StoreQueue s) -> StoreMonad s Int deleteExpireMsgs_ old q mq = do diff --git a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs index 3062e2313..93d6e8213 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs @@ -324,10 +324,10 @@ insertQueueQuery = VALUES (?,?,?,?,?,?,?,?,?,?,?) |] -foldQueueRecs :: Monoid a => Bool -> PostgresQueueStore q -> ((RecipientId, QueueRec) -> IO a) -> IO a -foldQueueRecs tty st f = do +foldQueueRecs :: Monoid a => Bool -> PostgresQueueStore q -> Maybe Int64 -> ((RecipientId, QueueRec) -> IO a) -> IO a +foldQueueRecs tty st skipOld_ f = do (n, r) <- withConnection (dbStore st) $ \db -> - DB.fold_ db (queueRecQuery <> " WHERE deleted_at IS NULL") (0 :: Int, mempty) $ \(i, acc) row -> do + foldRecs db (0 :: Int, mempty) $ \(i, acc) row -> do r <- f $ rowToQueueRec row let !i' = i + 1 !acc' = acc <> r @@ -336,6 +336,9 @@ foldQueueRecs tty st f = do when tty $ putStrLn $ progress n pure r where + foldRecs db = case skipOld_ of + Nothing -> DB.fold_ db (queueRecQuery <> " WHERE deleted_at IS NULL") + Just old -> DB.fold db (queueRecQuery <> " WHERE deleted_at IS NULL AND updated_at > ?") (Only old) progress i = "Processed: " <> show i <> " records" queueRecQuery :: Query diff --git a/src/Simplex/Messaging/Server/QueueStore/Postgres/Migrations.hs b/src/Simplex/Messaging/Server/QueueStore/Postgres/Migrations.hs index 03b6fecf6..a5b69b94b 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Postgres/Migrations.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Postgres/Migrations.hs @@ -11,7 +11,8 @@ import Text.RawString.QQ (r) serverSchemaMigrations :: [(String, Text, Maybe Text)] serverSchemaMigrations = - [ ("20250207_initial", m20250207_initial, Nothing) + [ ("20250207_initial", m20250207_initial, Nothing), + ("20250319_updated_index", m20250319_updated_index, Just down_m20250319_updated_index) ] -- | The list of migrations in ascending order by date @@ -44,3 +45,19 @@ CREATE UNIQUE INDEX idx_msg_queues_sender_id ON msg_queues(sender_id); CREATE UNIQUE INDEX idx_msg_queues_notifier_id ON msg_queues(notifier_id); CREATE INDEX idx_msg_queues_deleted_at ON msg_queues (deleted_at); |] + +m20250319_updated_index :: Text +m20250319_updated_index = + T.pack + [r| +DROP INDEX idx_msg_queues_deleted_at; +CREATE INDEX idx_msg_queues_updated_at ON msg_queues (deleted_at, updated_at); + |] + +down_m20250319_updated_index :: Text +down_m20250319_updated_index = + T.pack + [r| +DROP INDEX idx_msg_queues_updated_at; +CREATE INDEX idx_msg_queues_deleted_at ON msg_queues (deleted_at); + |] diff --git a/tests/CoreTests/MsgStoreTests.hs b/tests/CoreTests/MsgStoreTests.hs index adde9ae2b..1f9cbf777 100644 --- a/tests/CoreTests/MsgStoreTests.hs +++ b/tests/CoreTests/MsgStoreTests.hs @@ -32,7 +32,7 @@ import Data.Time.Clock.System (SystemTime (..), getSystemTime) import Simplex.Messaging.Crypto (pattern MaxLenBS) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Protocol (EntityId (..), Message (..), RecipientId, SParty (..), noMsgFlags) -import Simplex.Messaging.Server (MessageStats (..), exportMessages, importMessages, printMessageStats) +import Simplex.Messaging.Server (exportMessages, importMessages, printMessageStats) import Simplex.Messaging.Server.Env.STM (journalMsgStoreDepth, readWriteQueueStore) import Simplex.Messaging.Server.Expiration (ExpirationConfig (..), expireBeforeEpoch) import Simplex.Messaging.Server.MsgStore.Journal @@ -453,7 +453,7 @@ testExpireIdleQueues = do old <- expireBeforeEpoch ExpirationConfig {ttl = 1, checkInterval = 1} -- no old messages now <- systemSeconds <$> getSystemTime - (expired_, stored) <- runRight $ isolateQueue q "" $ idleDeleteExpiredMsgs now ms q old + (expired_, stored) <- runRight $ isolateQueue q "" $ withIdleMsgQueue now ms q $ deleteExpireMsgs_ old q expired_ `shouldBe` Just 0 stored `shouldBe` 0 (Nothing, False) <- readQueueState ms statePath