smp server: limit by time the queues to export journal messages for

This commit is contained in:
Evgeny Poberezkin
2025-09-12 13:58:31 +01:00
parent 6c66cf367a
commit d3bc0cba4b
6 changed files with 31 additions and 17 deletions

View File

@@ -2111,15 +2111,15 @@ randomId = fmap EntityId . randomId'
saveServerMessages :: Bool -> MsgStore s -> IO ()
saveServerMessages drainMsgs ms = case ms of
StoreMemory STMMsgStore {storeConfig = STMStoreConfig {storePath}} -> case storePath of
Just f -> exportMessages False ms f drainMsgs
Just f -> exportMessages False ms Nothing f drainMsgs
Nothing -> logNote "undelivered messages are not saved"
StoreJournal _ -> logNote "closed journal message storage"
#if defined(dbServerPostgres)
StoreDatabase _ -> logNote "closed postgres message storage"
#endif
exportMessages :: forall s. MsgStoreClass s => Bool -> MsgStore s -> FilePath -> Bool -> IO ()
exportMessages tty st f drainMsgs = do
exportMessages :: forall s. MsgStoreClass s => Bool -> MsgStore s -> Maybe Int64 -> FilePath -> Bool -> IO ()
exportMessages tty st ttl_ f drainMsgs = do
logNote $ "saving messages to file " <> T.pack f
run $ case st of
StoreMemory ms -> exportMessages_ ms $ getMsgs ms
@@ -2128,7 +2128,7 @@ exportMessages tty st f drainMsgs = do
StoreDatabase ms -> exportDbMessages tty ms
#endif
where
exportMessages_ ms get = fmap (\(Sum n) -> n) . unsafeWithAllMsgQueues tty ms . saveQueueMsgs get
exportMessages_ ms get = fmap (\(Sum n) -> n) . unsafeWithAllMsgQueues tty ms ttl_ . saveQueueMsgs get
run :: (Handle -> IO Int) -> IO ()
run a = liftIO $ withFile f WriteMode $ tryAny . a >=> \case
Right n -> logNote $ "messages saved: " <> tshow n
@@ -2174,7 +2174,7 @@ processServerMessages StartOptions {skipWarnings} = do
run processValidateQueue
| otherwise = logWarn "skipping message expiration" $> Nothing
where
run a = unsafeWithAllMsgQueues False ms a `catchAny` \_ -> exitFailure
run a = unsafeWithAllMsgQueues False ms Nothing a `catchAny` \_ -> exitFailure
processExpireQueue :: Int64 -> JournalQueue s -> IO MessageStats
processExpireQueue old q = unsafeRunStore q "processExpireQueue" $ do
mq <- getMsgQueue ms q False

View File

@@ -107,7 +107,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
deleteDirIfExists cfgPath
deleteDirIfExists logPath
putStrLn "Deleted configuration and log files"
Journal cmd -> withIniFile $ \ini -> do
Journal cmd ttl_ -> withIniFile $ \ini -> do
msgsDirExists <- doesDirectoryExist storeMsgsJournalDir
msgsFileExists <- doesFileExist storeMsgsFilePath
storeLogFile <- getRequiredStoreLogFile ini
@@ -147,7 +147,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
Right (ASType SQSMemory msType) -> do
ms <- newJournalMsgStore logPath MQStoreCfg
readQueueStore True (mkQueue ms False) storeLogFile $ stmQueueStore ms
exportMessages True (StoreJournal ms) storeMsgsFilePath False
exportMessages True (StoreJournal ms) ttl_ storeMsgsFilePath False
putStrLn "Export completed"
putStrLn $ case msType of
SMSMemory -> "store_messages set to `memory`, start the server."
@@ -160,7 +160,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
putStrLn $ "Schema " <> B.unpack schema <> " does not exist in PostrgreSQL database: " <> B.unpack connstr
exitFailure
ms <- newJournalMsgStore logPath $ PQStoreCfg PostgresStoreCfg {dbOpts, dbStoreLogPath, confirmMigrations = MCYesUp, deletedTTL = iniDeletedTTL ini}
exportMessages True (StoreJournal ms) storeMsgsFilePath False
exportMessages True (StoreJournal ms) ttl_ storeMsgsFilePath False
putStrLn "Export completed"
putStrLn "store_messages set to `journal`, store_queues is set to `database`.\nExport queues to store log to use memory storage for messages (`smp-server database export`)."
Right (ASType SQSPostgres SMSPostgres) -> do
@@ -755,7 +755,7 @@ data CliCommand
| OnlineCert CertOptions
| Start StartOptions
| Delete
| Journal StoreCmd
| Journal StoreCmd (Maybe Int64)
| Database StoreCmd DatabaseTable DBOpts
data StoreCmd = SCImport | SCExport | SCDelete
@@ -779,7 +779,7 @@ cliCommandP cfgPath logPath iniFile =
<> command "cert" (info (OnlineCert <$> certOptionsP) (progDesc $ "Generate new online TLS server credentials (configuration: " <> iniFile <> ")"))
<> command "start" (info (Start <$> startOptionsP) (progDesc $ "Start server (configuration: " <> iniFile <> ")"))
<> command "delete" (info (pure Delete) (progDesc "Delete configuration and log files"))
<> command "journal" (info (Journal <$> journalCmdP) (progDesc "Import/export messages to/from journal storage"))
<> command "journal" (info (Journal <$> journalCmdP <*> msgTtlP) (progDesc "Import/export messages to/from journal storage"))
<> command "database" (info (Database <$> databaseCmdP <*> dbTableP <*> dbOptsP defaultDBOpts) (progDesc "Import/export queues to/from PostgreSQL database storage"))
)
where
@@ -941,6 +941,13 @@ cliCommandP cfgPath logPath iniFile =
<> help "Database tables: queues/messages"
<> metavar "TABLE"
)
msgTtlP =
optional $ option
auto
( long "ttl"
<> help "Limit queues to export messages from by TTL seconds"
<> metavar "TTL"
)
parseBasicAuth :: ReadM ServerPassword
parseBasicAuth = eitherReader $ fmap ServerPassword . strDecode . B.pack
entityP :: String -> String -> String -> Parser (Maybe Entity, Maybe Text)

View File

@@ -404,11 +404,18 @@ instance MsgStoreClass (JournalMsgStore s) where
-- This function can only be used in server CLI commands or before server is started.
-- It does not cache queues and is NOT concurrency safe.
unsafeWithAllMsgQueues :: Monoid a => Bool -> JournalMsgStore s -> (JournalQueue s -> IO a) -> IO a
unsafeWithAllMsgQueues tty ms action = case queueStore_ ms of
unsafeWithAllMsgQueues :: Monoid a => Bool -> JournalMsgStore s -> Maybe Int64 -> (JournalQueue s -> IO a) -> IO a
unsafeWithAllMsgQueues tty ms ttl_ action = case queueStore_ ms of
MQStore st -> withLoadedQueues st run
#if defined(dbServerPostgres)
PQStore st -> foldQueueRecs False tty st $ uncurry (mkQueue ms False) >=> run
PQStore st -> do
foldQueues <- case ttl_ of
Nothing -> pure $ foldQueueRecs False tty st
Just ttl -> do
now <- systemSeconds <$> getSystemTime
let veryOld = now - 2 * ttl - 86400
pure $ foldRecentQueueRecs veryOld tty st
foldQueues $ uncurry (mkQueue ms False) >=> run
#endif
where
run q = do

View File

@@ -104,7 +104,7 @@ instance MsgStoreClass PostgresMsgStore where
withActiveMsgQueues _ _ = error "withActiveMsgQueues not used"
unsafeWithAllMsgQueues _ _ _ = error "unsafeWithAllMsgQueues not used"
unsafeWithAllMsgQueues _ _ _ _ = error "unsafeWithAllMsgQueues not used"
expireOldMessages :: Bool -> PostgresMsgStore -> Int64 -> Int64 -> IO MessageStats
expireOldMessages _tty ms now ttl =

View File

@@ -80,7 +80,7 @@ instance MsgStoreClass STMMsgStore where
{-# INLINE closeMsgStore #-}
withActiveMsgQueues = withLoadedQueues . queueStore_
{-# INLINE withActiveMsgQueues #-}
unsafeWithAllMsgQueues _ = withLoadedQueues . queueStore_
unsafeWithAllMsgQueues _ ms _ = withLoadedQueues $ queueStore_ ms
{-# INLINE unsafeWithAllMsgQueues #-}
expireOldMessages :: Bool -> STMMsgStore -> Int64 -> Int64 -> IO MessageStats

View File

@@ -41,8 +41,8 @@ class (Monad (StoreMonad s), QueueStoreClass (StoreQueue s) (QueueStore s)) => M
closeMsgStore :: s -> IO ()
withActiveMsgQueues :: Monoid a => s -> (StoreQueue s -> IO a) -> IO a
-- This function can only be used in server CLI commands or before server is started.
-- tty, store
unsafeWithAllMsgQueues :: Monoid a => Bool -> s -> (StoreQueue s -> IO a) -> IO a
-- tty, store, ttl
unsafeWithAllMsgQueues :: Monoid a => Bool -> s -> Maybe Int64 -> (StoreQueue s -> IO a) -> IO a
-- tty, store, now, ttl
expireOldMessages :: Bool -> s -> Int64 -> Int64 -> IO MessageStats
logQueueStates :: s -> IO ()