From d3bc0cba4bfb409ba064e8c80e2e11cb901cd473 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Fri, 12 Sep 2025 13:58:31 +0100 Subject: [PATCH] smp server: limit by time the queues to export journal messages for --- src/Simplex/Messaging/Server.hs | 10 +++++----- src/Simplex/Messaging/Server/Main.hs | 17 ++++++++++++----- .../Messaging/Server/MsgStore/Journal.hs | 13 ++++++++++--- .../Messaging/Server/MsgStore/Postgres.hs | 2 +- src/Simplex/Messaging/Server/MsgStore/STM.hs | 2 +- src/Simplex/Messaging/Server/MsgStore/Types.hs | 4 ++-- 6 files changed, 31 insertions(+), 17 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 7d6e00ab0..df0c3ac94 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -2111,15 +2111,15 @@ randomId = fmap EntityId . randomId' saveServerMessages :: Bool -> MsgStore s -> IO () saveServerMessages drainMsgs ms = case ms of StoreMemory STMMsgStore {storeConfig = STMStoreConfig {storePath}} -> case storePath of - Just f -> exportMessages False ms f drainMsgs + Just f -> exportMessages False ms Nothing f drainMsgs Nothing -> logNote "undelivered messages are not saved" StoreJournal _ -> logNote "closed journal message storage" #if defined(dbServerPostgres) StoreDatabase _ -> logNote "closed postgres message storage" #endif -exportMessages :: forall s. MsgStoreClass s => Bool -> MsgStore s -> FilePath -> Bool -> IO () -exportMessages tty st f drainMsgs = do +exportMessages :: forall s. MsgStoreClass s => Bool -> MsgStore s -> Maybe Int64 -> FilePath -> Bool -> IO () +exportMessages tty st ttl_ f drainMsgs = do logNote $ "saving messages to file " <> T.pack f run $ case st of StoreMemory ms -> exportMessages_ ms $ getMsgs ms @@ -2128,7 +2128,7 @@ exportMessages tty st f drainMsgs = do StoreDatabase ms -> exportDbMessages tty ms #endif where - exportMessages_ ms get = fmap (\(Sum n) -> n) . unsafeWithAllMsgQueues tty ms . saveQueueMsgs get + exportMessages_ ms get = fmap (\(Sum n) -> n) . unsafeWithAllMsgQueues tty ms ttl_ . saveQueueMsgs get run :: (Handle -> IO Int) -> IO () run a = liftIO $ withFile f WriteMode $ tryAny . a >=> \case Right n -> logNote $ "messages saved: " <> tshow n @@ -2174,7 +2174,7 @@ processServerMessages StartOptions {skipWarnings} = do run processValidateQueue | otherwise = logWarn "skipping message expiration" $> Nothing where - run a = unsafeWithAllMsgQueues False ms a `catchAny` \_ -> exitFailure + run a = unsafeWithAllMsgQueues False ms Nothing a `catchAny` \_ -> exitFailure processExpireQueue :: Int64 -> JournalQueue s -> IO MessageStats processExpireQueue old q = unsafeRunStore q "processExpireQueue" $ do mq <- getMsgQueue ms q False diff --git a/src/Simplex/Messaging/Server/Main.hs b/src/Simplex/Messaging/Server/Main.hs index 8e5fd55ee..0d5500f21 100644 --- a/src/Simplex/Messaging/Server/Main.hs +++ b/src/Simplex/Messaging/Server/Main.hs @@ -107,7 +107,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = deleteDirIfExists cfgPath deleteDirIfExists logPath putStrLn "Deleted configuration and log files" - Journal cmd -> withIniFile $ \ini -> do + Journal cmd ttl_ -> withIniFile $ \ini -> do msgsDirExists <- doesDirectoryExist storeMsgsJournalDir msgsFileExists <- doesFileExist storeMsgsFilePath storeLogFile <- getRequiredStoreLogFile ini @@ -147,7 +147,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = Right (ASType SQSMemory msType) -> do ms <- newJournalMsgStore logPath MQStoreCfg readQueueStore True (mkQueue ms False) storeLogFile $ stmQueueStore ms - exportMessages True (StoreJournal ms) storeMsgsFilePath False + exportMessages True (StoreJournal ms) ttl_ storeMsgsFilePath False putStrLn "Export completed" putStrLn $ case msType of SMSMemory -> "store_messages set to `memory`, start the server." @@ -160,7 +160,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = putStrLn $ "Schema " <> B.unpack schema <> " does not exist in PostrgreSQL database: " <> B.unpack connstr exitFailure ms <- newJournalMsgStore logPath $ PQStoreCfg PostgresStoreCfg {dbOpts, dbStoreLogPath, confirmMigrations = MCYesUp, deletedTTL = iniDeletedTTL ini} - exportMessages True (StoreJournal ms) storeMsgsFilePath False + exportMessages True (StoreJournal ms) ttl_ storeMsgsFilePath False putStrLn "Export completed" putStrLn "store_messages set to `journal`, store_queues is set to `database`.\nExport queues to store log to use memory storage for messages (`smp-server database export`)." Right (ASType SQSPostgres SMSPostgres) -> do @@ -755,7 +755,7 @@ data CliCommand | OnlineCert CertOptions | Start StartOptions | Delete - | Journal StoreCmd + | Journal StoreCmd (Maybe Int64) | Database StoreCmd DatabaseTable DBOpts data StoreCmd = SCImport | SCExport | SCDelete @@ -779,7 +779,7 @@ cliCommandP cfgPath logPath iniFile = <> command "cert" (info (OnlineCert <$> certOptionsP) (progDesc $ "Generate new online TLS server credentials (configuration: " <> iniFile <> ")")) <> command "start" (info (Start <$> startOptionsP) (progDesc $ "Start server (configuration: " <> iniFile <> ")")) <> command "delete" (info (pure Delete) (progDesc "Delete configuration and log files")) - <> command "journal" (info (Journal <$> journalCmdP) (progDesc "Import/export messages to/from journal storage")) + <> command "journal" (info (Journal <$> journalCmdP <*> msgTtlP) (progDesc "Import/export messages to/from journal storage")) <> command "database" (info (Database <$> databaseCmdP <*> dbTableP <*> dbOptsP defaultDBOpts) (progDesc "Import/export queues to/from PostgreSQL database storage")) ) where @@ -941,6 +941,13 @@ cliCommandP cfgPath logPath iniFile = <> help "Database tables: queues/messages" <> metavar "TABLE" ) + msgTtlP = + optional $ option + auto + ( long "ttl" + <> help "Limit queues to export messages from by TTL seconds" + <> metavar "TTL" + ) parseBasicAuth :: ReadM ServerPassword parseBasicAuth = eitherReader $ fmap ServerPassword . strDecode . B.pack entityP :: String -> String -> String -> Parser (Maybe Entity, Maybe Text) diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index 3a639238b..5a2b95915 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -404,11 +404,18 @@ instance MsgStoreClass (JournalMsgStore s) where -- This function can only be used in server CLI commands or before server is started. -- It does not cache queues and is NOT concurrency safe. - unsafeWithAllMsgQueues :: Monoid a => Bool -> JournalMsgStore s -> (JournalQueue s -> IO a) -> IO a - unsafeWithAllMsgQueues tty ms action = case queueStore_ ms of + unsafeWithAllMsgQueues :: Monoid a => Bool -> JournalMsgStore s -> Maybe Int64 -> (JournalQueue s -> IO a) -> IO a + unsafeWithAllMsgQueues tty ms ttl_ action = case queueStore_ ms of MQStore st -> withLoadedQueues st run #if defined(dbServerPostgres) - PQStore st -> foldQueueRecs False tty st $ uncurry (mkQueue ms False) >=> run + PQStore st -> do + foldQueues <- case ttl_ of + Nothing -> pure $ foldQueueRecs False tty st + Just ttl -> do + now <- systemSeconds <$> getSystemTime + let veryOld = now - 2 * ttl - 86400 + pure $ foldRecentQueueRecs veryOld tty st + foldQueues $ uncurry (mkQueue ms False) >=> run #endif where run q = do diff --git a/src/Simplex/Messaging/Server/MsgStore/Postgres.hs b/src/Simplex/Messaging/Server/MsgStore/Postgres.hs index 14dd3977a..b9eaf8535 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Postgres.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Postgres.hs @@ -104,7 +104,7 @@ instance MsgStoreClass PostgresMsgStore where withActiveMsgQueues _ _ = error "withActiveMsgQueues not used" - unsafeWithAllMsgQueues _ _ _ = error "unsafeWithAllMsgQueues not used" + unsafeWithAllMsgQueues _ _ _ _ = error "unsafeWithAllMsgQueues not used" expireOldMessages :: Bool -> PostgresMsgStore -> Int64 -> Int64 -> IO MessageStats expireOldMessages _tty ms now ttl = diff --git a/src/Simplex/Messaging/Server/MsgStore/STM.hs b/src/Simplex/Messaging/Server/MsgStore/STM.hs index 73e1bf398..6e685e8de 100644 --- a/src/Simplex/Messaging/Server/MsgStore/STM.hs +++ b/src/Simplex/Messaging/Server/MsgStore/STM.hs @@ -80,7 +80,7 @@ instance MsgStoreClass STMMsgStore where {-# INLINE closeMsgStore #-} withActiveMsgQueues = withLoadedQueues . queueStore_ {-# INLINE withActiveMsgQueues #-} - unsafeWithAllMsgQueues _ = withLoadedQueues . queueStore_ + unsafeWithAllMsgQueues _ ms _ = withLoadedQueues $ queueStore_ ms {-# INLINE unsafeWithAllMsgQueues #-} expireOldMessages :: Bool -> STMMsgStore -> Int64 -> Int64 -> IO MessageStats diff --git a/src/Simplex/Messaging/Server/MsgStore/Types.hs b/src/Simplex/Messaging/Server/MsgStore/Types.hs index 98c12d4be..fbd6b59b7 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Types.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Types.hs @@ -41,8 +41,8 @@ class (Monad (StoreMonad s), QueueStoreClass (StoreQueue s) (QueueStore s)) => M closeMsgStore :: s -> IO () withActiveMsgQueues :: Monoid a => s -> (StoreQueue s -> IO a) -> IO a -- This function can only be used in server CLI commands or before server is started. - -- tty, store - unsafeWithAllMsgQueues :: Monoid a => Bool -> s -> (StoreQueue s -> IO a) -> IO a + -- tty, store, ttl + unsafeWithAllMsgQueues :: Monoid a => Bool -> s -> Maybe Int64 -> (StoreQueue s -> IO a) -> IO a -- tty, store, now, ttl expireOldMessages :: Bool -> s -> Int64 -> Int64 -> IO MessageStats logQueueStates :: s -> IO ()