From a7a4e278e0ff3bc4d330d1a89dd82e68e5e8e051 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Sun, 16 Feb 2025 20:41:11 +0000 Subject: [PATCH] do not close queue state when queue is opened for writing --- simplexmq.cabal | 2 +- src/Simplex/Messaging/Server/Env/STM.hs | 24 +++- src/Simplex/Messaging/Server/Main.hs | 5 +- .../Messaging/Server/MsgStore/Journal.hs | 112 ++++++++++++------ src/Simplex/Messaging/Server/MsgStore/STM.hs | 6 +- .../Messaging/Server/MsgStore/Types.hs | 4 +- tests/CoreTests/MsgStoreTests.hs | 56 ++++++++- 7 files changed, 154 insertions(+), 55 deletions(-) diff --git a/simplexmq.cabal b/simplexmq.cabal index af4d10ccb..ef8329028 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -1,7 +1,7 @@ cabal-version: 1.12 name: simplexmq -version: 6.3.0.500 +version: 6.3.0.501 synopsis: SimpleXMQ message broker description: This package includes <./docs/Simplex-Messaging-Server.html server>, <./docs/Simplex-Messaging-Client.html client> and diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 4044e0d22..aa1dec948 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -25,7 +25,7 @@ import Data.List (intercalate) import Data.List.NonEmpty (NonEmpty) import Data.Maybe (isJust, isNothing) import qualified Data.Text as T -import Data.Time.Clock (getCurrentTime) +import Data.Time.Clock (addUTCTime, getCurrentTime, nominalDay) import Data.Time.Clock.System (SystemTime) import qualified Data.X509 as X import Data.X509.Validation (Fingerprint (..)) @@ -297,9 +297,9 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, storeLogFile, msgSt msgStore@(AMS _ store) <- case msgStoreType of AMSType SMSMemory -> AMS SMSMemory <$> newMsgStore STMStoreConfig {storePath = storeMsgsFile, quota = msgQueueQuota} AMSType SMSJournal -> case storeMsgsFile of - Just storePath -> - let cfg = JournalStoreConfig {storePath, quota = msgQueueQuota, pathParts = journalMsgStoreDepth, maxMsgCount = maxJournalMsgCount, maxStateLines = maxJournalStateLines, stateTailSize = defaultStateTailSize, idleInterval = idleQueueInterval} - in AMS SMSJournal <$> newMsgStore cfg + Just storePath -> do + cfg <- mkJournalStoreConfig storePath msgQueueQuota maxJournalMsgCount maxJournalStateLines idleQueueInterval + AMS SMSJournal <$> newMsgStore cfg Nothing -> putStrLn "Error: journal msg store require path in [STORE_LOG], restore_messages" >> exitFailure ntfStore <- NtfStore <$> TM.emptyIO random <- C.newRandom @@ -357,6 +357,22 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, storeLogFile, msgSt | isJust storeMsgsFile = SPMMessages | otherwise = SPMQueues +mkJournalStoreConfig :: FilePath -> Int -> Int -> Int -> Int64 -> IO JournalStoreConfig +mkJournalStoreConfig storePath msgQueueQuota maxJournalMsgCount maxJournalStateLines idleQueueInterval = do + expireBefore <- addUTCTime (-14 * nominalDay) <$> getCurrentTime + pure + JournalStoreConfig + { storePath, + quota = msgQueueQuota, + pathParts = journalMsgStoreDepth, + maxMsgCount = maxJournalMsgCount, + maxStateLines = maxJournalStateLines, + stateTailSize = defaultStateTailSize, + idleInterval = idleQueueInterval, + expireBefore, + keepMinBackups = 2 + } + newSMPProxyAgent :: SMPClientAgentConfig -> TVar ChaChaDRG -> IO ProxyAgent newSMPProxyAgent smpAgentCfg random = do smpAgent <- newSMPClientAgent smpAgentCfg random diff --git a/src/Simplex/Messaging/Server/Main.hs b/src/Simplex/Messaging/Server/Main.hs index 1d21ffa6a..7834ed023 100644 --- a/src/Simplex/Messaging/Server/Main.hs +++ b/src/Simplex/Messaging/Server/Main.hs @@ -44,7 +44,6 @@ import Simplex.Messaging.Server.CLI import Simplex.Messaging.Server.Env.STM import Simplex.Messaging.Server.Expiration import Simplex.Messaging.Server.Information -import Simplex.Messaging.Server.MsgStore.Journal (JournalStoreConfig (..)) import Simplex.Messaging.Server.MsgStore.Types (AMSType (..), SMSType (..), newMsgStore) import Simplex.Messaging.Server.QueueStore.STM (readQueueStore) import Simplex.Messaging.Transport (simplexMQVersion, supportedProxyClientSMPRelayVRange, supportedServerSMPRelayVRange) @@ -147,7 +146,9 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = doesFileExist iniFile >>= \case True -> readIniFile iniFile >>= either exitError a _ -> exitError $ "Error: server is not initialized (" <> iniFile <> " does not exist).\nRun `" <> executableName <> " init`." - newJournalMsgStore = newMsgStore JournalStoreConfig {storePath = storeMsgsJournalDir, pathParts = journalMsgStoreDepth, quota = defaultMsgQueueQuota, maxMsgCount = defaultMaxJournalMsgCount, maxStateLines = defaultMaxJournalStateLines, stateTailSize = defaultStateTailSize, idleInterval = checkInterval defaultMessageExpiration} + newJournalMsgStore = do + cfg <- mkJournalStoreConfig storeMsgsJournalDir defaultMsgQueueQuota defaultMaxJournalMsgCount defaultMaxJournalStateLines $ checkInterval defaultMessageExpiration + newMsgStore cfg iniFile = combine cfgPath "smp-server.ini" serverVersion = "SMP server v" <> simplexMQVersion defaultServerPorts = "5223,443" diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index d6b7dc902..bb1a2626e 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -48,12 +48,13 @@ import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Data.Functor (($>)) import Data.Int (Int64) -import Data.List (intercalate) -import Data.Maybe (catMaybes, fromMaybe, isNothing) +import Data.List (intercalate, sort) +import Data.Maybe (catMaybes, fromMaybe, isNothing, mapMaybe) +import Data.Text (Text) import qualified Data.Text as T -import Data.Time.Clock (getCurrentTime) +import Data.Time.Clock (UTCTime, getCurrentTime) import Data.Time.Clock.System (SystemTime (..), getSystemTime) -import Data.Time.Format.ISO8601 (iso8601Show) +import Data.Time.Format.ISO8601 (iso8601Show, iso8601ParseM) import GHC.IO (catchAny) import Simplex.Messaging.Agent.Client (getMapLock, withLockMap) import Simplex.Messaging.Agent.Lock @@ -68,7 +69,7 @@ import Simplex.Messaging.Server.StoreLog import Simplex.Messaging.Util (ifM, tshow, whenM, ($>>=), (<$$>)) import System.Directory import System.Exit -import System.FilePath (()) +import System.FilePath (takeFileName, ()) import System.IO (BufferMode (..), Handle, IOMode (..), SeekMode (..), stdout) import qualified System.IO as IO import System.Random (StdGen, genByteString, newStdGen) @@ -91,7 +92,10 @@ data JournalStoreConfig = JournalStoreConfig maxStateLines :: Int, stateTailSize :: Int, -- time in seconds after which the queue will be closed after message expiration - idleInterval :: Int64 + idleInterval :: Int64, + -- expire state backup files + expireBefore :: UTCTime, + keepMinBackups :: Int } data JournalQueue = JournalQueue @@ -265,7 +269,7 @@ instance MsgStoreClass JournalMsgStore where r' <- case strDecode $ B.pack queueId of Right rId -> getQueue ms SRecipient rId >>= \case - Right q -> unStoreIO (getMsgQueue ms q) *> action q <* closeMsgQueue q + Right q -> unStoreIO (getMsgQueue ms q False) *> action q <* closeMsgQueue q Left AUTH -> do logWarn $ "STORE: processQueue, queue " <> T.pack queueId <> " was removed, removing " <> T.pack dir removeQueueDirectory_ dir @@ -307,15 +311,15 @@ instance MsgStoreClass JournalMsgStore where queueRec' = queueRec {-# INLINE queueRec' #-} - getMsgQueue :: JournalMsgStore -> JournalQueue -> StoreIO JournalMsgQueue - getMsgQueue ms@JournalMsgStore {random} JournalQueue {recipientId = rId, msgQueue_} = + getMsgQueue :: JournalMsgStore -> JournalQueue -> Bool -> StoreIO JournalMsgQueue + getMsgQueue ms@JournalMsgStore {random} JournalQueue {recipientId = rId, msgQueue_} forWrite = StoreIO $ readTVarIO msgQueue_ >>= maybe newQ pure where newQ = do let dir = msgQueueDirectory ms rId statePath = msgQueueStatePath dir $ B.unpack (strEncode rId) queue = JMQueue {queueDirectory = dir, statePath} - q <- ifM (doesDirectoryExist dir) (openMsgQueue ms queue) (createQ queue) + q <- ifM (doesDirectoryExist dir) (openMsgQueue ms queue forWrite) (createQ queue) atomically $ writeTVar msgQueue_ $ Just q pure q where @@ -342,7 +346,7 @@ instance MsgStoreClass JournalMsgStore where pure r where peek = do - mq <- getMsgQueue ms q + mq <- getMsgQueue ms q False (mq,) <$$> tryPeekMsg_ q mq -- only runs action if queue is not empty @@ -390,7 +394,7 @@ instance MsgStoreClass JournalMsgStore where writeMsg :: JournalMsgStore -> JournalQueue -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool)) writeMsg ms q' logState msg = isolateQueue q' "writeMsg" $ do - q <- getMsgQueue ms q' + q <- getMsgQueue ms q' True StoreIO $ (`E.finally` updateActiveAt q') $ do st@MsgQueueState {canWrite, size} <- readTVarIO (state q) let empty = size == 0 @@ -488,25 +492,41 @@ tryStore op rId a = ExceptT $ E.mask_ $ E.try a >>= either storeErr pure isolateQueueId :: String -> JournalMsgStore -> RecipientId -> IO (Either ErrorType a) -> ExceptT ErrorType IO a isolateQueueId op ms rId = tryStore op rId . withLockMap (queueLocks ms) rId op -openMsgQueue :: JournalMsgStore -> JMQueue -> IO JournalMsgQueue -openMsgQueue ms q@JMQueue {queueDirectory = dir, statePath} = do +openMsgQueue :: JournalMsgStore -> JMQueue -> Bool -> IO JournalMsgQueue +openMsgQueue ms q@JMQueue {queueDirectory = dir, statePath} forWrite = do (st, sh) <- readWriteQueueState ms statePath - let MsgQueueState {readState = rs, writeState = ws, size} = st - if size == 0 - then E.uninterruptibleMask_ $ do - -- If the queue is empty, journals are deleted and state file is closed - -- Journal will be created again if queue is written to. - -- canWrite is set to True. - st' <- newMsgQueueState <$> newJournalId (random ms) - unsafeAppendState sh st' - hClose sh - removeJournalIfExists dir rs - unless (journalId ws == journalId rs) $ removeJournalIfExists dir ws - mkJournalQueue q st' Nothing + removeBackups ms q + if size st == 0 + then do + (st', hs_) <- removeJournals st sh + mkJournalQueue q st' hs_ else do (st', rh, wh_) <- closeOnException sh $ openJournals ms dir st sh let hs = MsgQueueHandles {stateHandle = sh, readHandle = rh, writeHandle = wh_} mkJournalQueue q st' (Just hs) + where + -- If the queue is empty, journals are deleted. + -- New journal is created if queue is written to. + -- canWrite is set to True. + removeJournals MsgQueueState {readState = rs, writeState = ws} sh = E.uninterruptibleMask_ $ do + rjId <- newJournalId $ random ms + let st' = newMsgQueueState rjId + hs_ <- + if forWrite + then Just <$> newJournalHandles st' rjId + else Nothing <$ backupQueueState + removeJournalIfExists dir rs + unless (journalId ws == journalId rs) $ removeJournalIfExists dir ws + pure (st', hs_) + where + newJournalHandles st' rjId = do + appendState_ sh st' + rh <- closeOnException sh $ createNewJournal dir rjId + pure MsgQueueHandles {stateHandle = sh, readHandle = rh, writeHandle = Nothing} + backupQueueState = do + hClose sh + ts <- getCurrentTime + renameFile statePath $ stateBackupPath statePath ts mkJournalQueue :: JMQueue -> MsgQueueState -> Maybe MsgQueueHandles -> IO JournalMsgQueue mkJournalQueue queue st hs_ = do @@ -540,11 +560,11 @@ updateQueueState q log' hs st a = do atomically $ writeTVar (state q) st >> a appendState :: Handle -> MsgQueueState -> IO () -appendState h = E.uninterruptibleMask_ . unsafeAppendState h +appendState h = E.uninterruptibleMask_ . appendState_ h {-# INLINE appendState #-} -unsafeAppendState :: Handle -> MsgQueueState -> IO () -unsafeAppendState h st = B.hPutStr h $ strEncode st `B.snoc` '\n' +appendState_ :: Handle -> MsgQueueState -> IO () +appendState_ h st = B.hPutStr h $ strEncode st `B.snoc` '\n' updateReadPos :: JournalMsgQueue -> Bool -> Int64 -> MsgQueueHandles -> IO () updateReadPos q log' len hs = do @@ -645,16 +665,21 @@ fixFileSize h pos = do | otherwise -> pure () removeJournal :: FilePath -> JournalState t -> IO () -removeJournal dir JournalState {journalId} = safeRemoveFile $ journalFilePath dir journalId +removeJournal dir JournalState {journalId} = + safeRemoveFile "removeJournal" $ journalFilePath dir journalId removeJournalIfExists :: FilePath -> JournalState t -> IO () removeJournalIfExists dir JournalState {journalId} = do let path = journalFilePath dir journalId - whenM (doesFileExist path) $ safeRemoveFile path + handleError "removeJournalIfExists" path $ + whenM (doesFileExist path) $ removeFile path -safeRemoveFile :: FilePath -> IO () -safeRemoveFile path = - removeFile path `catchAny` (\e -> logError $ "STORE: removeJournal, " <> T.pack path <> ", " <> tshow e) +safeRemoveFile :: Text -> FilePath -> IO () +safeRemoveFile cxt path = handleError cxt path $ removeFile path + +handleError :: Text -> FilePath -> IO () -> IO () +handleError cxt path a = + a `catchAny` \e -> logError $ "STORE: " <> cxt <> ", " <> T.pack path <> ", " <> tshow e -- This function is supposed to be resilient to crashes while updating state files, -- and also resilient to crashes during its execution. @@ -703,7 +728,7 @@ readWriteQueueState JournalMsgStore {random, config} statePath = renameFile statePath tempBackup -- 1) temp backup r <- writeQueueState st -- 2) save state ts <- getCurrentTime - renameFile tempBackup (statePath <> "." <> iso8601Show ts <> ".bak") -- 3) timed backup + renameFile tempBackup $ stateBackupPath statePath ts -- 3) timed backup pure r writeQueueState st = do sh <- openFile statePath AppendMode @@ -718,6 +743,20 @@ readWriteQueueState JournalMsgStore {random, config} statePath = then IO.hSeek h AbsoluteSeek (size - sz') >> B.hGet h sz else B.hGet h (fromIntegral size) +removeBackups :: JournalMsgStore -> JMQueue -> IO () +removeBackups ms JMQueue {queueDirectory = dir, statePath} = do + times <- sort . mapMaybe backupPathTime <$> listDirectory dir + let toDelete = filter (< expireBefore) $ take (length times - keepMinBackups) times + mapM_ (safeRemoveFile "removeBackups" . stateBackupPath statePath) toDelete + where + JournalMsgStore {config = JournalStoreConfig {expireBefore, keepMinBackups}} = ms + backupPathTime :: FilePath -> Maybe UTCTime + backupPathTime = iso8601ParseM . T.unpack <=< T.stripSuffix ".bak" <=< T.stripPrefix statePathPfx . T.pack + statePathPfx = T.pack $ takeFileName statePath <> "." + +stateBackupPath :: FilePath -> UTCTime -> FilePath +stateBackupPath statePath ts = statePath <> "." <> iso8601Show ts <> ".bak" + validQueueState :: MsgQueueState -> Bool validQueueState MsgQueueState {readState = rs, writeState = ws, size} | journalId rs == journalId ws = @@ -764,8 +803,7 @@ removeQueueDirectory st = removeQueueDirectory_ . msgQueueDirectory st removeQueueDirectory_ :: FilePath -> IO () removeQueueDirectory_ dir = - removePathForcibly dir `catchAny` \e -> - logError $ "STORE: removeQueueDirectory, " <> T.pack dir <> ", " <> tshow e + handleError "removeQueueDirectory" dir $ removePathForcibly dir hAppend :: Handle -> Int64 -> ByteString -> IO () hAppend h pos s = do diff --git a/src/Simplex/Messaging/Server/MsgStore/STM.hs b/src/Simplex/Messaging/Server/MsgStore/STM.hs index 2df41a9f1..05ab31475 100644 --- a/src/Simplex/Messaging/Server/MsgStore/STM.hs +++ b/src/Simplex/Messaging/Server/MsgStore/STM.hs @@ -89,8 +89,8 @@ instance MsgStoreClass STMMsgStore where queueRec' = queueRec {-# INLINE queueRec' #-} - getMsgQueue :: STMMsgStore -> STMQueue -> STM STMMsgQueue - getMsgQueue _ STMQueue {msgQueue_} = readTVar msgQueue_ >>= maybe newQ pure + getMsgQueue :: STMMsgStore -> STMQueue -> Bool -> STM STMMsgQueue + getMsgQueue _ STMQueue {msgQueue_} _ = readTVar msgQueue_ >>= maybe newQ pure where newQ = do msgQueue <- newTQueue @@ -131,7 +131,7 @@ instance MsgStoreClass STMMsgStore where writeMsg :: STMMsgStore -> STMQueue -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool)) writeMsg ms q' _logState msg = liftIO $ atomically $ do - STMMsgQueue {msgQueue = q, canWrite, size} <- getMsgQueue ms q' + STMMsgQueue {msgQueue = q, canWrite, size} <- getMsgQueue ms q' True canWrt <- readTVar canWrite empty <- isEmptyTQueue q if canWrt || empty diff --git a/src/Simplex/Messaging/Server/MsgStore/Types.hs b/src/Simplex/Messaging/Server/MsgStore/Types.hs index 7317a0fab..679945f55 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Types.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Types.hs @@ -54,7 +54,7 @@ class Monad (StoreMonad s) => MsgStoreClass s where recipientId' :: StoreQueue s -> RecipientId queueRec' :: StoreQueue s -> TVar (Maybe QueueRec) getPeekMsgQueue :: s -> StoreQueue s -> StoreMonad s (Maybe (MsgQueue s, Message)) - getMsgQueue :: s -> StoreQueue s -> StoreMonad s (MsgQueue s) + getMsgQueue :: s -> StoreQueue s -> Bool -> StoreMonad s (MsgQueue s) -- the journal queue will be closed after action if it was initially closed or idle longer than interval in config withIdleMsgQueue :: Int64 -> s -> StoreQueue s -> (MsgQueue s -> StoreMonad s a) -> StoreMonad s (Maybe a, Int) @@ -119,7 +119,7 @@ withPeekMsgQueue st q op a = isolateQueue q op $ getPeekMsgQueue st q >>= a deleteExpiredMsgs :: MsgStoreClass s => s -> StoreQueue s -> Int64 -> ExceptT ErrorType IO Int deleteExpiredMsgs st q old = isolateQueue q "deleteExpiredMsgs" $ - getMsgQueue st q >>= deleteExpireMsgs_ old q + getMsgQueue st q False >>= deleteExpireMsgs_ old q -- closed and idle queues will be closed after expiration -- returns (expired count, queue size after expiration) diff --git a/tests/CoreTests/MsgStoreTests.hs b/tests/CoreTests/MsgStoreTests.hs index 5386d257a..106825af8 100644 --- a/tests/CoreTests/MsgStoreTests.hs +++ b/tests/CoreTests/MsgStoreTests.hs @@ -15,6 +15,7 @@ module CoreTests.MsgStoreTests where import AgentTests.FunctionalAPITests (runRight, runRight_) +import Control.Concurrent (threadDelay) import Control.Concurrent.STM import Control.Exception (bracket) import Control.Monad @@ -24,8 +25,10 @@ import Crypto.Random (ChaChaDRG) import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import qualified Data.ByteString.Base64.URL as B64 -import Data.List (isPrefixOf) +import Data.List (isPrefixOf, isSuffixOf) import Data.Maybe (fromJust) +import Data.Time.Calendar (fromGregorian) +import Data.Time.Clock (UTCTime (..), addUTCTime, getCurrentTime) import Data.Time.Clock.System (getSystemTime) import Simplex.Messaging.Crypto (pattern MaxLenBS) import qualified Simplex.Messaging.Crypto as C @@ -59,6 +62,8 @@ msgStoreTests = do it "should switch to write file when read file missing" testReadFileMissingSwitch it "should create write file when missing" testWriteFileMissing it "should create read file when read and write files are missing" testReadAndWriteFilesMissing + describe "Journal message store: queue state backup expiration" $ do + it "should remove old queue state backups" testRemoveQueueStateBackups where someMsgStoreTests :: STMStoreClass s => SpecWith s someMsgStoreTests = do @@ -80,7 +85,9 @@ testJournalStoreCfg = maxMsgCount = 4, maxStateLines = 2, stateTailSize = 256, - idleInterval = 21600 + idleInterval = 21600, + expireBefore = UTCTime (fromGregorian 2025 1 1) 0, + keepMinBackups = 3 } mkMessage :: MonadIO m => ByteString -> m Message @@ -328,7 +335,7 @@ testRemoveJournals ms = do runRight $ do q <- ExceptT $ addQueue ms rId qr Just (Message {msgId = mId1}, True) <- write q "message 1" - Just (Message {msgId = mId2}, False) <- write q "message 2" + Just (Message {msgId = mId2}, False) <- write q "message 2" (Msg "message 1", Msg "message 2") <- tryDelPeekMsg ms q mId1 (Msg "message 2", Nothing) <- tryDelPeekMsg ms q mId2 liftIO $ closeMsgQueue q @@ -344,7 +351,7 @@ testRemoveJournals ms = do Nothing <- tryPeekMsg ms q -- still not removed, queue is empty and not opened liftIO $ journalFilesCount dir `shouldReturn` 1 - _mq <- isolateQueue q "test" $ getMsgQueue ms q + _mq <- isolateQueue q "test" $ getMsgQueue ms q False -- journal is removed liftIO $ journalFilesCount dir `shouldReturn` 0 Just (Message {msgId = mId3}, True) <- write q "message 3" @@ -372,11 +379,48 @@ testRemoveJournals ms = do journalFilesCount dir `shouldReturn` 1 runRight $ do q <- ExceptT $ getQueue ms SRecipient rId - _mq <- isolateQueue q "test" $ getMsgQueue ms q - liftIO $ journalFilesCount dir `shouldReturn` 0 + Just (Message {}, True) <- write q "message 8" + liftIO $ journalFilesCount dir `shouldReturn` 1 + liftIO $ closeMsgQueue q where journalFilesCount dir = length . filter ("messages." `isPrefixOf`) <$> listDirectory dir +testRemoveQueueStateBackups :: IO () +testRemoveQueueStateBackups = do + g <- C.newRandom + (rId, qr) <- testNewQueueRec g True + + expireBefore <- addUTCTime 1 <$> getCurrentTime -- expire all backups created withing one second + let cfg = testJournalStoreCfg {maxStateLines = 1, expireBefore, keepMinBackups = 0} + ms <- newMsgStore cfg + + let dir = msgQueueDirectory ms rId + write q s = writeMsg ms q True =<< mkMessage s + + runRight $ do + q <- ExceptT $ addQueue ms rId qr + Just (Message {msgId = mId1}, True) <- write q "message 1" + Just (Message {msgId = mId2}, False) <- write q "message 2" + (Msg "message 1", Msg "message 2") <- tryDelPeekMsg ms q mId1 + (Msg "message 2", Nothing) <- tryDelPeekMsg ms q mId2 + liftIO $ closeMsgQueue q + liftIO $ stateBackupCount dir `shouldReturn` 0 + + q1 <- ExceptT $ getQueue ms SRecipient rId + Just (Message {}, True) <- write q1 "message 3" + Just (Message {}, False) <- write q1 "message 4" + liftIO $ closeMsgQueue q1 + liftIO $ stateBackupCount dir `shouldReturn` 0 + + liftIO $ threadDelay 1000000 + q2 <- ExceptT $ getQueue ms SRecipient rId + Just (Message {}, False) <- write q2 "message 5" + Nothing <- write q2 "message 5" + liftIO $ closeMsgQueue q2 + liftIO $ stateBackupCount dir `shouldReturn` 1 + where + stateBackupCount dir = length . filter (".bak" `isSuffixOf`) <$> listDirectory dir + testReadFileMissing :: JournalMsgStore -> IO () testReadFileMissing ms = do g <- C.newRandom