From a1f4de9eccda4a8da35ba943aeea034f18a96419 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Tue, 9 Sep 2025 18:46:23 +0100 Subject: [PATCH] cleanup --- src/Simplex/Messaging/Server.hs | 43 +++--- src/Simplex/Messaging/Server/Main.hs | 49 +++---- .../Messaging/Server/MsgStore/Journal.hs | 135 ++++-------------- .../Messaging/Server/MsgStore/Types.hs | 1 + tests/CoreTests/MsgStoreTests.hs | 23 ++- 5 files changed, 77 insertions(+), 174 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 3f91f08ba..5814d546b 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -105,7 +105,7 @@ import Simplex.Messaging.Server.Control import Simplex.Messaging.Server.Env.STM as Env import Simplex.Messaging.Server.Expiration import Simplex.Messaging.Server.MsgStore -import Simplex.Messaging.Server.MsgStore.Journal (JournalMsgStore, JournalQueue, msgQueueDirectory) +import Simplex.Messaging.Server.MsgStore.Journal (JournalMsgStore, JournalQueue, getJournalQueueMessages) import Simplex.Messaging.Server.MsgStore.STM import Simplex.Messaging.Server.MsgStore.Types import Simplex.Messaging.Server.NtfStore @@ -128,7 +128,7 @@ import System.IO (hPrint, hPutStrLn, hSetNewlineMode, universalNewlineMode) import System.Mem.Weak (deRefWeak) import UnliftIO (timeout) import UnliftIO.Concurrent -import UnliftIO.Directory (doesDirectoryExist, doesFileExist, renameFile) +import UnliftIO.Directory (doesFileExist, renameFile) import UnliftIO.Exception import UnliftIO.IO import UnliftIO.STM @@ -2111,31 +2111,28 @@ saveServerMessages drainMsgs ms = case ms of StoreJournal _ -> logNote "closed journal message storage" exportMessages :: forall s. MsgStoreClass s => Bool -> MsgStore s -> FilePath -> Bool -> IO () -exportMessages tty ms f drainMsgs = do +exportMessages tty st f drainMsgs = do logNote $ "saving messages to file " <> T.pack f - liftIO $ withFile f WriteMode $ \h -> - tryAny (unsafeWithAllMsgQueues tty False ms' $ saveQueueMsgs h) >>= \case - Right (Sum total) -> logNote $ "messages saved: " <> tshow total + run $ case st of + StoreMemory ms -> exportMessages_ ms $ getMsgs ms + StoreJournal ms -> exportMessages_ ms $ getJournalMsgs ms + where + exportMessages_ ms get = fmap (\(Sum n) -> n) . unsafeWithAllMsgQueues tty True ms . saveQueueMsgs get + run :: (Handle -> IO Int) -> IO () + run a = liftIO $ withFile f WriteMode $ tryAny . a >=> \case + Right n -> logNote $ "messages saved: " <> tshow n Left e -> do logError $ "error exporting messages: " <> tshow e exitFailure - where - ms' :: s - ms' = case ms of - StoreMemory s -> s - StoreJournal s -> s - getMessages q = case ms of - StoreMemory _ -> getMsgs - StoreJournal _ -> - ifM - (doesDirectoryExist $ msgQueueDirectory ms' $ recipientId q) - getMsgs - (pure []) - -- getJournalQueueMessages ms' q - where - getMsgs = unsafeRunStore q "saveQueueMsgs" $ getQueueMessages_ drainMsgs q =<< getMsgQueue ms' q False - saveQueueMsgs h q = do - msgs <- getMessages q + getJournalMsgs ms q = + readTVarIO (msgQueue q) >>= \case + Just _ -> getMsgs ms q + Nothing -> getJournalQueueMessages ms q + getMsgs :: MsgStoreClass s' => s' -> StoreQueue s' -> IO [Message] + getMsgs ms q = unsafeRunStore q "saveQueueMsgs" $ getQueueMessages_ drainMsgs q =<< getMsgQueue ms q False + saveQueueMsgs :: (StoreQueue s -> IO [Message]) -> Handle -> StoreQueue s -> IO (Sum Int) + saveQueueMsgs get h q = do + msgs <- get q unless (null msgs) $ BLD.hPutBuilder h $ encodeMessages (recipientId q) msgs pure $ Sum $ length msgs encodeMessages rId = mconcat . map (\msg -> BLD.byteString (strEncode $ MLRv3 rId msg) <> BLD.char8 '\n') diff --git a/src/Simplex/Messaging/Server/Main.hs b/src/Simplex/Messaging/Server/Main.hs index 1c3765698..c095f7160 100644 --- a/src/Simplex/Messaging/Server/Main.hs +++ b/src/Simplex/Messaging/Server/Main.hs @@ -52,7 +52,7 @@ import Simplex.Messaging.Server.Env.STM import Simplex.Messaging.Server.Expiration import Simplex.Messaging.Server.Information import Simplex.Messaging.Server.Main.Init -import Simplex.Messaging.Server.MsgStore.Journal (JournalMsgStore (..), QStoreCfg (..), exportJournalMessages, stmQueueStore) +import Simplex.Messaging.Server.MsgStore.Journal (JournalMsgStore (..), QStoreCfg (..), stmQueueStore) import Simplex.Messaging.Server.MsgStore.Types (MsgStoreClass (..), SQSType (..), SMSType (..), newMsgStore) import Simplex.Messaging.Server.QueueStore.Postgres.Config import Simplex.Messaging.Server.StoreLog.ReadWrite (readQueueStore) @@ -60,11 +60,11 @@ import Simplex.Messaging.Transport (supportedProxyClientSMPRelayVRange, alpnSupp import Simplex.Messaging.Transport.Client (TransportHost (..), defaultSocksProxy) import Simplex.Messaging.Transport.HTTP2 (httpALPN) import Simplex.Messaging.Transport.Server (ServerCredentials (..), mkTransportServerConfig) -import Simplex.Messaging.Util (eitherToMaybe, ifM, tshow, unlessM) +import Simplex.Messaging.Util (eitherToMaybe, ifM, unlessM) import System.Directory (createDirectoryIfMissing, doesDirectoryExist, doesFileExist) import System.Exit (exitFailure) import System.FilePath (combine) -import System.IO (BufferMode (..), IOMode (..), hSetBuffering, stderr, stdout, withFile) +import System.IO (BufferMode (..), hSetBuffering, stderr, stdout) import Text.Read (readMaybe) #if defined(dbServerPostgres) @@ -131,25 +131,24 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = Right (ASType SQSMemory SMSMemory) -> "store_messages set to `memory`, update it to `journal` in INI file" Right (ASType _ SMSJournal) -> "store_messages set to `journal`" Left e -> e <> ", configure storage correctly" - SCExport fast + SCExport | msgsFileExists && msgsDirExists -> exitConfigureMsgStorage | msgsFileExists -> do putStrLn $ storeMsgsFilePath <> " file already exists." exitFailure - | fast -> do - confirmExport - logNote $ "saving messages to file " <> T.pack storeMsgsFilePath - ms <- newJournalMsgStore logPath MQStoreCfg - total <- withFile storeMsgsFilePath WriteMode $ exportJournalMessages True ms - logNote $ "messages saved: " <> tshow total - completedExport | otherwise -> do - confirmExport + confirmOrExit + ("WARNING: journal directory " <> storeMsgsJournalDir <> " will be exported to message log file " <> storeMsgsFilePath) + "Journal not exported" case readStoreType ini of - Right (ASType SQSMemory _) -> do + Right (ASType SQSMemory msType) -> do ms <- newJournalMsgStore logPath MQStoreCfg readQueueStore True (mkQueue ms False) storeLogFile $ stmQueueStore ms exportMessages True (StoreJournal ms) storeMsgsFilePath False + putStrLn "Export completed" + putStrLn $ case msType of + SMSMemory -> "store_messages set to `memory`, start the server." + SMSJournal -> "store_messages set to `journal`, update it to `memory` in INI file" Right (ASType SQSPostgres SMSJournal) -> do #if defined(dbServerPostgres) let dbStoreLogPath = enableDbStoreLog' ini $> storeLogFilePath @@ -159,23 +158,12 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = exitFailure ms <- newJournalMsgStore logPath $ PQStoreCfg PostgresStoreCfg {dbOpts, dbStoreLogPath, confirmMigrations = MCYesUp, deletedTTL = iniDeletedTTL ini} exportMessages True (StoreJournal ms) storeMsgsFilePath False + putStrLn "Export completed" + putStrLn "store_messages set to `journal`, store_queues is set to `database`.\nExport queues to store log to use memory storage for messages (`smp-server database export`)." #else noPostgresExit #endif - Left _ -> pure () - completedExport - where - confirmExport = - confirmOrExit - ("WARNING: journal directory " <> storeMsgsJournalDir <> " will be exported to message log file " <> storeMsgsFilePath) - "Journal not exported" - completedExport = do - putStrLn "Export completed" - putStrLn $ case readStoreType ini of - Right (ASType SQSMemory SMSMemory) -> "store_messages set to `memory`, start the server." - Right (ASType SQSMemory SMSJournal) -> "store_messages set to `journal`, update it to `memory` in INI file" - Right (ASType SQSPostgres SMSJournal) -> "store_messages set to `journal`, store_queues is set to `database`.\nExport queues to store log to use memory storage for messages (`smp-server database export`)." - Left e -> e <> ", configure storage correctly" + Left e -> putStrLn $ e <> ", configure storage correctly" SCDelete | not msgsDirExists -> do putStrLn $ storeMsgsJournalDir <> " directory does not exists." @@ -214,7 +202,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = where setToDbStr :: String setToDbStr = "store_queues set to `memory`, update it to `database` in INI file" - SCExport _ + SCExport | schemaExists && storeLogExists -> exitConfigureQueueStore connstr schema | not schemaExists -> do putStrLn $ "Schema " <> B.unpack schema <> " does not exist in PostrgreSQL database: " <> B.unpack connstr @@ -691,7 +679,7 @@ data CliCommand | Journal StoreCmd | Database StoreCmd DBOpts -data StoreCmd = SCImport | SCExport Bool | SCDelete +data StoreCmd = SCImport | SCExport | SCDelete cliCommandP :: FilePath -> FilePath -> FilePath -> Parser CliCommand cliCommandP cfgPath logPath iniFile = @@ -852,8 +840,7 @@ cliCommandP cfgPath logPath iniFile = storeCmdP src dest = hsubparser ( command "import" (info (pure SCImport) (progDesc $ "Import " <> src <> " into a new " <> dest)) - <> command "export" (info (pure $ SCExport False) (progDesc $ "Export " <> dest <> " to " <> src)) - <> command "fast-export" (info (pure $ SCExport True) (progDesc $ "Fast export of " <> dest <> " to " <> src)) + <> command "export" (info (pure SCExport) (progDesc $ "Export " <> dest <> " to " <> src)) <> command "delete" (info (pure SCDelete) (progDesc $ "Delete " <> dest)) ) parseBasicAuth :: ReadM ServerPassword diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index 3fca45c04..82eb13d1d 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -38,6 +38,7 @@ module Simplex.Messaging.Server.MsgStore.Journal msgQueueStatePath, readQueueState, newMsgQueueState, + getJournalQueueMessages, newJournalId, appendState, queueLogFileName, @@ -47,9 +48,6 @@ module Simplex.Messaging.Server.MsgStore.Journal #if defined(dbServerPostgres) postgresQueueStore, #endif - exportJournalMessages, - getJournalQueueMessages, - encodeMessages, ) where @@ -59,16 +57,14 @@ import Control.Logger.Simple import Control.Monad import Control.Monad.Trans.Except import qualified Data.Attoparsec.ByteString.Char8 as A -import qualified Data.ByteString.Builder as BLD import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B -import Data.Char (ord) import Data.Either (fromRight, partitionEithers) import Data.Functor (($>)) import Data.Int (Int64) -import Data.List (sort, sortBy) +import Data.List (sort) import qualified Data.Map.Strict as M -import Data.Maybe (catMaybes, fromMaybe, isJust, isNothing, mapMaybe) +import Data.Maybe (fromMaybe, isJust, isNothing, mapMaybe) import Data.Text (Text) import qualified Data.Text as T import Data.Text.Encoding (decodeLatin1) @@ -80,7 +76,6 @@ import Simplex.Messaging.Agent.Client (getMapLock) import Simplex.Messaging.Agent.Lock import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol -import Simplex.Messaging.Server.MsgStore import Simplex.Messaging.Server.MsgStore.Journal.SharedLock import Simplex.Messaging.Server.MsgStore.Types import Simplex.Messaging.Server.QueueStore @@ -93,9 +88,8 @@ import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Util (ifM, tshow, whenM, ($>>=), (<$$>)) import System.Directory -import System.Exit (exitFailure) import System.FilePath (takeFileName, ()) -import System.IO (BufferMode (..), Handle, IOMode (..), SeekMode (..), stdout) +import System.IO (BufferMode (..), Handle, IOMode (..), SeekMode (..)) import qualified System.IO as IO import System.Random (StdGen, genByteString, newStdGen) @@ -492,7 +486,7 @@ instance MsgStoreClass (JournalMsgStore s) where where newQ = do let dir = msgQueueDirectory ms rId - statePath = msgQueueStatePath dir $ B.unpack (strEncode rId) + statePath = msgQueueStatePath dir rId queue = JMQueue {queueDirectory = dir, statePath} q <- ifM (doesDirectoryExist dir) (openMsgQueue ms queue forWrite) (createQ queue) atomically $ writeTVar msgQueue' $ Just q @@ -820,8 +814,8 @@ msgQueueDirectory JournalMsgStore {config = JournalStoreConfig {storePath, pathP let (seg, s') = B.splitAt 2 s in seg : splitSegments (n - 1) s' -msgQueueStatePath :: FilePath -> String -> FilePath -msgQueueStatePath dir queueId = dir (queueLogFileName <> "." <> queueId <> logFileExt) +msgQueueStatePath :: FilePath -> RecipientId -> FilePath +msgQueueStatePath dir rId = dir (queueLogFileName <> "." <> B.unpack (strEncode rId) <> logFileExt) createNewJournal :: FilePath -> ByteString -> IO Handle createNewJournal dir journalId = do @@ -1045,104 +1039,29 @@ hClose h = closeOnException :: Handle -> IO a -> IO a closeOnException h a = a `E.onException` hClose h -exportJournalMessages :: Bool -> JournalMsgStore s -> Handle -> IO Int -exportJournalMessages tty ms@JournalMsgStore {config} h = ifM (doesDirectoryExist storePath) exportStore (pure 0) - where - exportStore = do - (!qCount, !msgCount) <- foldQueues 0 exportQueueMessages (0, 0) ("", storePath) - putStrLn $ progress qCount - pure msgCount - JournalStoreConfig {storePath, pathParts} = config - exportQueueMessages :: (Int, Int) -> (String, FilePath) -> IO (Int, Int) - exportQueueMessages (!i, !count) (queueId, dir) = do - let i' = i + 1 - when (tty && i' `mod` 100000 == 0) $ putStr (progress i' <> "\r") >> IO.hFlush stdout - case strDecode $ B.pack queueId of - Right rId -> do - let statePath = msgQueueStatePath dir queueId - msgs <- - readQueueState ms statePath >>= \case - (Just MsgQueueState {readState = rs, writeState = ws, size}, _) - | size == 0 -> pure [] - | journalId rs == journalId ws -> - getMsgs rs (bytePos rs) (bytePos ws - bytePos rs) - | otherwise -> - (++) - <$> getMsgs rs (byteCount rs - bytePos rs) (bytePos rs) - <*> getMsgs ws (0 :: Int64) (bytePos ws) - where - getMsgs :: JournalState t -> Int64 -> Int64 -> IO [Message] - getMsgs js pos sz - | sz > 0 = IO.withFile f ReadWriteMode $ \h' -> do - IO.hSeek h' AbsoluteSeek $ fromIntegral pos - parseMsgs f =<< B.hGet h' (fromIntegral sz) - | otherwise = pure [] - where - f = journalFilePath dir $ journalId js - parseMsgs f s = do - let (errs, msgs) = partitionEithers $ map (strDecode @Message) $ B.lines s - unless (null errs) $ putStrLn $ "Error reading " <> show (length errs) <> " messages from " <> f - pure msgs - _ -> pure [] - unless (null msgs) $ BLD.hPutBuilder h $ encodeMessages rId msgs - pure (i', count + length msgs) - Left e -> do - logError $ "STORE: exportQueueMessages, message queue directory " <> T.pack dir <> " is invalid, " <> tshow e - exitFailure - progress i = "Processed: " <> show i <> " queues" - foldQueues depth f acc (queueId, path) = do - let f' = if depth == pathParts - 1 then f else foldQueues (depth + 1) f - listDirs >>= foldM f' acc - where - listDirs = fmap catMaybes . mapM queuePath . sortBy compareB64 =<< listDirectory path - compareB64 [] [] = EQ - compareB64 [] _ = LT - compareB64 _ [] = GT - compareB64 (c : cs) (c' : cs') - | c == c' = compareB64 cs cs' - | otherwise = compare (charValue c) (charValue c') - charValue '-' = 62 - charValue '_' = 63 - charValue c - | c >= 'A' && c <= 'Z' = ord c - ord 'A' - | c >= 'a' && c <= 'z' = 26 + ord c - ord 'a' - | c >= '0' && c <= '9' = 52 + ord c - ord '0' - | otherwise = -1 - queuePath dir = do - let !path' = path dir - !queueId' = queueId <> dir - ifM - (doesDirectoryExist path') - (pure $ Just (queueId', path')) - (Nothing <$ putStrLn ("Error: path " <> path' <> " is not a directory, skipping")) - getJournalQueueMessages :: JournalMsgStore s -> JournalQueue s -> IO [Message] -getJournalQueueMessages ms q = do - let rId = recipientId' q - dir = msgQueueDirectory ms rId - statePath = msgQueueStatePath dir $ B.unpack (strEncode rId) - readQueueState ms statePath >>= \case +getJournalQueueMessages ms q = + readQueueState ms (msgQueueStatePath dir rId) >>= \case (Just MsgQueueState {readState = rs, writeState = ws, size}, _) | size == 0 -> pure [] - | journalId rs == journalId ws -> do - let f = journalFilePath dir $ journalId rs - s <- B.readFile f - parseMsgs f $ B.take (bytePos' ws - bytePos' rs) $ B.drop (bytePos' rs) s | otherwise -> do - let rf = journalFilePath dir $ journalId rs - wf = journalFilePath dir $ journalId ws - r <- B.readFile rf - w <- B.readFile wf - rMsgs <- parseMsgs rf $ B.take (fromIntegral $ byteCount rs) $ B.drop (bytePos' rs) r - wMsgs <- parseMsgs wf $ B.take (bytePos' ws) w - pure $ rMsgs ++ wMsgs + msgs <- getMsgs rs (bytePos rs) (byteCount rs - bytePos rs) + if journalId rs == journalId ws + then pure msgs + else (msgs ++) <$> getMsgs ws (0 :: Int64) (bytePos ws) _ -> pure [] where - bytePos' = fromIntegral . bytePos - parseMsgs f s = do - let (errs, msgs) = partitionEithers $ map strDecode $ B.lines s - unless (null errs) $ putStrLn $ "Error reading " <> show (length errs) <> " messages from " <> f - pure msgs - -encodeMessages :: RecipientId -> [Message] -> BLD.Builder -encodeMessages rId = mconcat . map (\msg -> BLD.byteString (strEncode $ MLRv3 rId msg) <> BLD.char8 '\n') + rId = recipientId' q + dir = msgQueueDirectory ms rId + getMsgs :: JournalState t -> Int64 -> Int64 -> IO [Message] + getMsgs js pos sz + | sz > 0 = IO.withFile f ReadWriteMode $ \h' -> do + IO.hSeek h' AbsoluteSeek $ fromIntegral pos + parseMsgs =<< B.hGet h' (fromIntegral sz) + | otherwise = pure [] + where + f = journalFilePath dir $ journalId js + parseMsgs s = do + let (errs, msgs) = partitionEithers $ map (strDecode @Message) $ B.lines s + unless (null errs) $ putStrLn $ "Error reading " <> show (length errs) <> " messages from " <> f + pure msgs diff --git a/src/Simplex/Messaging/Server/MsgStore/Types.hs b/src/Simplex/Messaging/Server/MsgStore/Types.hs index e2d139ffb..b78b101f5 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Types.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Types.hs @@ -84,6 +84,7 @@ data MessageStats = MessageStats expiredMsgsCount :: Int, storedQueues :: Int } + deriving (Show) instance Monoid MessageStats where mempty = MessageStats 0 0 0 diff --git a/tests/CoreTests/MsgStoreTests.hs b/tests/CoreTests/MsgStoreTests.hs index 321dc76ff..c5cf3a437 100644 --- a/tests/CoreTests/MsgStoreTests.hs +++ b/tests/CoreTests/MsgStoreTests.hs @@ -23,7 +23,6 @@ import Control.Monad import Control.Monad.IO.Class import Control.Monad.Trans.Except import Crypto.Random (ChaChaDRG) -import qualified Data.ByteString.Base64.URL as B64 import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Data.List (isPrefixOf, isSuffixOf) @@ -227,9 +226,15 @@ testExportImportStore ms = do length <$> listDirectory (msgQueueDirectory ms rId1) `shouldReturn` 2 length <$> listDirectory (msgQueueDirectory ms rId2) `shouldReturn` 3 exportMessages False (StoreJournal ms) testStoreMsgsFile False - -- testFastExport ms testStoreMsgsFile closeMsgStore ms closeStoreLog sl + -- export with closed queues and compare + ms2 <- newMsgStore $ testJournalStoreCfg MQStoreCfg + readWriteQueueStore True (mkQueue ms2 True) testStoreLogFile (stmQueueStore ms2) >>= closeStoreLog + exportMessages False (StoreJournal ms2) (testStoreMsgsFile <> ".copy") False + s <- B.readFile testStoreMsgsFile + B.readFile (testStoreMsgsFile <> ".copy") `shouldReturn` s + let cfg = (testJournalStoreCfg MQStoreCfg :: JournalStoreConfig 'QSMemory) {storePath = testStoreMsgsDir2} ms' <- newMsgStore cfg readWriteQueueStore True (mkQueue ms' True) testStoreLogFile (stmQueueStore ms') >>= closeStoreLog @@ -239,7 +244,6 @@ testExportImportStore ms = do length <$> listDirectory (msgQueueDirectory ms rId1) `shouldReturn` 2 length <$> listDirectory (msgQueueDirectory ms rId2) `shouldReturn` 3 -- 2 message files exportMessages False (StoreJournal ms') testStoreMsgsFile2 False - -- testFastExport ms' testStoreMsgsFile2 (B.readFile testStoreMsgsFile2 `shouldReturn`) =<< B.readFile (testStoreMsgsFile <> ".bak") stmStore <- newMsgStore testSMTStoreConfig readWriteQueueStore True (mkQueue stmStore True) testStoreLogFile (queueStore stmStore) >>= closeStoreLog @@ -247,18 +251,13 @@ testExportImportStore ms = do importMessages False stmStore testStoreMsgsFile2 Nothing False exportMessages False (StoreMemory stmStore) testStoreMsgsFile False (B.sort <$> B.readFile testStoreMsgsFile `shouldReturn`) =<< (B.sort <$> B.readFile (testStoreMsgsFile2 <> ".bak")) - where - testFastExport ms' f = do - void $ withFile (f <> ".fast") WriteMode $ exportJournalMessages False ms' - s <- B.readFile f - B.readFile (f <> ".fast") `shouldReturn` s testQueueState :: JournalMsgStore s -> IO () testQueueState ms = do g <- C.newRandom rId <- EntityId <$> atomically (C.randomBytes 24 g) let dir = msgQueueDirectory ms rId - statePath = msgQueueStatePath dir $ B.unpack (B64.encode $ unEntityId rId) + statePath = msgQueueStatePath dir rId createDirectoryIfMissing True dir state <- newMsgQueueState <$> newJournalId (random ms) withFile statePath WriteMode (`appendState` state) @@ -319,7 +318,7 @@ testMessageState ms = do g <- C.newRandom (rId, qr) <- testNewQueueRec g QMMessaging let dir = msgQueueDirectory ms rId - statePath = msgQueueStatePath dir $ B.unpack (B64.encode $ unEntityId rId) + statePath = msgQueueStatePath dir rId write q s = writeMsg ms q True =<< mkMessage s mId1 <- runRight $ do @@ -344,7 +343,7 @@ testRemoveJournals ms = do g <- C.newRandom (rId, qr) <- testNewQueueRec g QMMessaging let dir = msgQueueDirectory ms rId - statePath = msgQueueStatePath dir $ B.unpack (B64.encode $ unEntityId rId) + statePath = msgQueueStatePath dir rId write q s = writeMsg ms q True =<< mkMessage s runRight $ do @@ -449,7 +448,7 @@ testExpireIdleQueues = do ms <- newMsgStore (testJournalStoreCfg MQStoreCfg) {idleInterval = 0} let dir = msgQueueDirectory ms rId - statePath = msgQueueStatePath dir $ B.unpack (B64.encode $ unEntityId rId) + statePath = msgQueueStatePath dir rId write q s = writeMsg ms q True =<< mkMessage s q <- runRight $ do