mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-26 03:14:53 +00:00
cleanup
This commit is contained in:
@@ -105,7 +105,7 @@ import Simplex.Messaging.Server.Control
|
||||
import Simplex.Messaging.Server.Env.STM as Env
|
||||
import Simplex.Messaging.Server.Expiration
|
||||
import Simplex.Messaging.Server.MsgStore
|
||||
import Simplex.Messaging.Server.MsgStore.Journal (JournalMsgStore, JournalQueue, msgQueueDirectory)
|
||||
import Simplex.Messaging.Server.MsgStore.Journal (JournalMsgStore, JournalQueue, getJournalQueueMessages)
|
||||
import Simplex.Messaging.Server.MsgStore.STM
|
||||
import Simplex.Messaging.Server.MsgStore.Types
|
||||
import Simplex.Messaging.Server.NtfStore
|
||||
@@ -128,7 +128,7 @@ import System.IO (hPrint, hPutStrLn, hSetNewlineMode, universalNewlineMode)
|
||||
import System.Mem.Weak (deRefWeak)
|
||||
import UnliftIO (timeout)
|
||||
import UnliftIO.Concurrent
|
||||
import UnliftIO.Directory (doesDirectoryExist, doesFileExist, renameFile)
|
||||
import UnliftIO.Directory (doesFileExist, renameFile)
|
||||
import UnliftIO.Exception
|
||||
import UnliftIO.IO
|
||||
import UnliftIO.STM
|
||||
@@ -2111,31 +2111,28 @@ saveServerMessages drainMsgs ms = case ms of
|
||||
StoreJournal _ -> logNote "closed journal message storage"
|
||||
|
||||
exportMessages :: forall s. MsgStoreClass s => Bool -> MsgStore s -> FilePath -> Bool -> IO ()
|
||||
exportMessages tty ms f drainMsgs = do
|
||||
exportMessages tty st f drainMsgs = do
|
||||
logNote $ "saving messages to file " <> T.pack f
|
||||
liftIO $ withFile f WriteMode $ \h ->
|
||||
tryAny (unsafeWithAllMsgQueues tty False ms' $ saveQueueMsgs h) >>= \case
|
||||
Right (Sum total) -> logNote $ "messages saved: " <> tshow total
|
||||
run $ case st of
|
||||
StoreMemory ms -> exportMessages_ ms $ getMsgs ms
|
||||
StoreJournal ms -> exportMessages_ ms $ getJournalMsgs ms
|
||||
where
|
||||
exportMessages_ ms get = fmap (\(Sum n) -> n) . unsafeWithAllMsgQueues tty True ms . saveQueueMsgs get
|
||||
run :: (Handle -> IO Int) -> IO ()
|
||||
run a = liftIO $ withFile f WriteMode $ tryAny . a >=> \case
|
||||
Right n -> logNote $ "messages saved: " <> tshow n
|
||||
Left e -> do
|
||||
logError $ "error exporting messages: " <> tshow e
|
||||
exitFailure
|
||||
where
|
||||
ms' :: s
|
||||
ms' = case ms of
|
||||
StoreMemory s -> s
|
||||
StoreJournal s -> s
|
||||
getMessages q = case ms of
|
||||
StoreMemory _ -> getMsgs
|
||||
StoreJournal _ ->
|
||||
ifM
|
||||
(doesDirectoryExist $ msgQueueDirectory ms' $ recipientId q)
|
||||
getMsgs
|
||||
(pure [])
|
||||
-- getJournalQueueMessages ms' q
|
||||
where
|
||||
getMsgs = unsafeRunStore q "saveQueueMsgs" $ getQueueMessages_ drainMsgs q =<< getMsgQueue ms' q False
|
||||
saveQueueMsgs h q = do
|
||||
msgs <- getMessages q
|
||||
getJournalMsgs ms q =
|
||||
readTVarIO (msgQueue q) >>= \case
|
||||
Just _ -> getMsgs ms q
|
||||
Nothing -> getJournalQueueMessages ms q
|
||||
getMsgs :: MsgStoreClass s' => s' -> StoreQueue s' -> IO [Message]
|
||||
getMsgs ms q = unsafeRunStore q "saveQueueMsgs" $ getQueueMessages_ drainMsgs q =<< getMsgQueue ms q False
|
||||
saveQueueMsgs :: (StoreQueue s -> IO [Message]) -> Handle -> StoreQueue s -> IO (Sum Int)
|
||||
saveQueueMsgs get h q = do
|
||||
msgs <- get q
|
||||
unless (null msgs) $ BLD.hPutBuilder h $ encodeMessages (recipientId q) msgs
|
||||
pure $ Sum $ length msgs
|
||||
encodeMessages rId = mconcat . map (\msg -> BLD.byteString (strEncode $ MLRv3 rId msg) <> BLD.char8 '\n')
|
||||
|
||||
@@ -52,7 +52,7 @@ import Simplex.Messaging.Server.Env.STM
|
||||
import Simplex.Messaging.Server.Expiration
|
||||
import Simplex.Messaging.Server.Information
|
||||
import Simplex.Messaging.Server.Main.Init
|
||||
import Simplex.Messaging.Server.MsgStore.Journal (JournalMsgStore (..), QStoreCfg (..), exportJournalMessages, stmQueueStore)
|
||||
import Simplex.Messaging.Server.MsgStore.Journal (JournalMsgStore (..), QStoreCfg (..), stmQueueStore)
|
||||
import Simplex.Messaging.Server.MsgStore.Types (MsgStoreClass (..), SQSType (..), SMSType (..), newMsgStore)
|
||||
import Simplex.Messaging.Server.QueueStore.Postgres.Config
|
||||
import Simplex.Messaging.Server.StoreLog.ReadWrite (readQueueStore)
|
||||
@@ -60,11 +60,11 @@ import Simplex.Messaging.Transport (supportedProxyClientSMPRelayVRange, alpnSupp
|
||||
import Simplex.Messaging.Transport.Client (TransportHost (..), defaultSocksProxy)
|
||||
import Simplex.Messaging.Transport.HTTP2 (httpALPN)
|
||||
import Simplex.Messaging.Transport.Server (ServerCredentials (..), mkTransportServerConfig)
|
||||
import Simplex.Messaging.Util (eitherToMaybe, ifM, tshow, unlessM)
|
||||
import Simplex.Messaging.Util (eitherToMaybe, ifM, unlessM)
|
||||
import System.Directory (createDirectoryIfMissing, doesDirectoryExist, doesFileExist)
|
||||
import System.Exit (exitFailure)
|
||||
import System.FilePath (combine)
|
||||
import System.IO (BufferMode (..), IOMode (..), hSetBuffering, stderr, stdout, withFile)
|
||||
import System.IO (BufferMode (..), hSetBuffering, stderr, stdout)
|
||||
import Text.Read (readMaybe)
|
||||
|
||||
#if defined(dbServerPostgres)
|
||||
@@ -131,25 +131,24 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
|
||||
Right (ASType SQSMemory SMSMemory) -> "store_messages set to `memory`, update it to `journal` in INI file"
|
||||
Right (ASType _ SMSJournal) -> "store_messages set to `journal`"
|
||||
Left e -> e <> ", configure storage correctly"
|
||||
SCExport fast
|
||||
SCExport
|
||||
| msgsFileExists && msgsDirExists -> exitConfigureMsgStorage
|
||||
| msgsFileExists -> do
|
||||
putStrLn $ storeMsgsFilePath <> " file already exists."
|
||||
exitFailure
|
||||
| fast -> do
|
||||
confirmExport
|
||||
logNote $ "saving messages to file " <> T.pack storeMsgsFilePath
|
||||
ms <- newJournalMsgStore logPath MQStoreCfg
|
||||
total <- withFile storeMsgsFilePath WriteMode $ exportJournalMessages True ms
|
||||
logNote $ "messages saved: " <> tshow total
|
||||
completedExport
|
||||
| otherwise -> do
|
||||
confirmExport
|
||||
confirmOrExit
|
||||
("WARNING: journal directory " <> storeMsgsJournalDir <> " will be exported to message log file " <> storeMsgsFilePath)
|
||||
"Journal not exported"
|
||||
case readStoreType ini of
|
||||
Right (ASType SQSMemory _) -> do
|
||||
Right (ASType SQSMemory msType) -> do
|
||||
ms <- newJournalMsgStore logPath MQStoreCfg
|
||||
readQueueStore True (mkQueue ms False) storeLogFile $ stmQueueStore ms
|
||||
exportMessages True (StoreJournal ms) storeMsgsFilePath False
|
||||
putStrLn "Export completed"
|
||||
putStrLn $ case msType of
|
||||
SMSMemory -> "store_messages set to `memory`, start the server."
|
||||
SMSJournal -> "store_messages set to `journal`, update it to `memory` in INI file"
|
||||
Right (ASType SQSPostgres SMSJournal) -> do
|
||||
#if defined(dbServerPostgres)
|
||||
let dbStoreLogPath = enableDbStoreLog' ini $> storeLogFilePath
|
||||
@@ -159,23 +158,12 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
|
||||
exitFailure
|
||||
ms <- newJournalMsgStore logPath $ PQStoreCfg PostgresStoreCfg {dbOpts, dbStoreLogPath, confirmMigrations = MCYesUp, deletedTTL = iniDeletedTTL ini}
|
||||
exportMessages True (StoreJournal ms) storeMsgsFilePath False
|
||||
putStrLn "Export completed"
|
||||
putStrLn "store_messages set to `journal`, store_queues is set to `database`.\nExport queues to store log to use memory storage for messages (`smp-server database export`)."
|
||||
#else
|
||||
noPostgresExit
|
||||
#endif
|
||||
Left _ -> pure ()
|
||||
completedExport
|
||||
where
|
||||
confirmExport =
|
||||
confirmOrExit
|
||||
("WARNING: journal directory " <> storeMsgsJournalDir <> " will be exported to message log file " <> storeMsgsFilePath)
|
||||
"Journal not exported"
|
||||
completedExport = do
|
||||
putStrLn "Export completed"
|
||||
putStrLn $ case readStoreType ini of
|
||||
Right (ASType SQSMemory SMSMemory) -> "store_messages set to `memory`, start the server."
|
||||
Right (ASType SQSMemory SMSJournal) -> "store_messages set to `journal`, update it to `memory` in INI file"
|
||||
Right (ASType SQSPostgres SMSJournal) -> "store_messages set to `journal`, store_queues is set to `database`.\nExport queues to store log to use memory storage for messages (`smp-server database export`)."
|
||||
Left e -> e <> ", configure storage correctly"
|
||||
Left e -> putStrLn $ e <> ", configure storage correctly"
|
||||
SCDelete
|
||||
| not msgsDirExists -> do
|
||||
putStrLn $ storeMsgsJournalDir <> " directory does not exists."
|
||||
@@ -214,7 +202,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
|
||||
where
|
||||
setToDbStr :: String
|
||||
setToDbStr = "store_queues set to `memory`, update it to `database` in INI file"
|
||||
SCExport _
|
||||
SCExport
|
||||
| schemaExists && storeLogExists -> exitConfigureQueueStore connstr schema
|
||||
| not schemaExists -> do
|
||||
putStrLn $ "Schema " <> B.unpack schema <> " does not exist in PostrgreSQL database: " <> B.unpack connstr
|
||||
@@ -691,7 +679,7 @@ data CliCommand
|
||||
| Journal StoreCmd
|
||||
| Database StoreCmd DBOpts
|
||||
|
||||
data StoreCmd = SCImport | SCExport Bool | SCDelete
|
||||
data StoreCmd = SCImport | SCExport | SCDelete
|
||||
|
||||
cliCommandP :: FilePath -> FilePath -> FilePath -> Parser CliCommand
|
||||
cliCommandP cfgPath logPath iniFile =
|
||||
@@ -852,8 +840,7 @@ cliCommandP cfgPath logPath iniFile =
|
||||
storeCmdP src dest =
|
||||
hsubparser
|
||||
( command "import" (info (pure SCImport) (progDesc $ "Import " <> src <> " into a new " <> dest))
|
||||
<> command "export" (info (pure $ SCExport False) (progDesc $ "Export " <> dest <> " to " <> src))
|
||||
<> command "fast-export" (info (pure $ SCExport True) (progDesc $ "Fast export of " <> dest <> " to " <> src))
|
||||
<> command "export" (info (pure SCExport) (progDesc $ "Export " <> dest <> " to " <> src))
|
||||
<> command "delete" (info (pure SCDelete) (progDesc $ "Delete " <> dest))
|
||||
)
|
||||
parseBasicAuth :: ReadM ServerPassword
|
||||
|
||||
@@ -38,6 +38,7 @@ module Simplex.Messaging.Server.MsgStore.Journal
|
||||
msgQueueStatePath,
|
||||
readQueueState,
|
||||
newMsgQueueState,
|
||||
getJournalQueueMessages,
|
||||
newJournalId,
|
||||
appendState,
|
||||
queueLogFileName,
|
||||
@@ -47,9 +48,6 @@ module Simplex.Messaging.Server.MsgStore.Journal
|
||||
#if defined(dbServerPostgres)
|
||||
postgresQueueStore,
|
||||
#endif
|
||||
exportJournalMessages,
|
||||
getJournalQueueMessages,
|
||||
encodeMessages,
|
||||
)
|
||||
where
|
||||
|
||||
@@ -59,16 +57,14 @@ import Control.Logger.Simple
|
||||
import Control.Monad
|
||||
import Control.Monad.Trans.Except
|
||||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
import qualified Data.ByteString.Builder as BLD
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Char (ord)
|
||||
import Data.Either (fromRight, partitionEithers)
|
||||
import Data.Functor (($>))
|
||||
import Data.Int (Int64)
|
||||
import Data.List (sort, sortBy)
|
||||
import Data.List (sort)
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Maybe (catMaybes, fromMaybe, isJust, isNothing, mapMaybe)
|
||||
import Data.Maybe (fromMaybe, isJust, isNothing, mapMaybe)
|
||||
import Data.Text (Text)
|
||||
import qualified Data.Text as T
|
||||
import Data.Text.Encoding (decodeLatin1)
|
||||
@@ -80,7 +76,6 @@ import Simplex.Messaging.Agent.Client (getMapLock)
|
||||
import Simplex.Messaging.Agent.Lock
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Protocol
|
||||
import Simplex.Messaging.Server.MsgStore
|
||||
import Simplex.Messaging.Server.MsgStore.Journal.SharedLock
|
||||
import Simplex.Messaging.Server.MsgStore.Types
|
||||
import Simplex.Messaging.Server.QueueStore
|
||||
@@ -93,9 +88,8 @@ import Simplex.Messaging.TMap (TMap)
|
||||
import qualified Simplex.Messaging.TMap as TM
|
||||
import Simplex.Messaging.Util (ifM, tshow, whenM, ($>>=), (<$$>))
|
||||
import System.Directory
|
||||
import System.Exit (exitFailure)
|
||||
import System.FilePath (takeFileName, (</>))
|
||||
import System.IO (BufferMode (..), Handle, IOMode (..), SeekMode (..), stdout)
|
||||
import System.IO (BufferMode (..), Handle, IOMode (..), SeekMode (..))
|
||||
import qualified System.IO as IO
|
||||
import System.Random (StdGen, genByteString, newStdGen)
|
||||
|
||||
@@ -492,7 +486,7 @@ instance MsgStoreClass (JournalMsgStore s) where
|
||||
where
|
||||
newQ = do
|
||||
let dir = msgQueueDirectory ms rId
|
||||
statePath = msgQueueStatePath dir $ B.unpack (strEncode rId)
|
||||
statePath = msgQueueStatePath dir rId
|
||||
queue = JMQueue {queueDirectory = dir, statePath}
|
||||
q <- ifM (doesDirectoryExist dir) (openMsgQueue ms queue forWrite) (createQ queue)
|
||||
atomically $ writeTVar msgQueue' $ Just q
|
||||
@@ -820,8 +814,8 @@ msgQueueDirectory JournalMsgStore {config = JournalStoreConfig {storePath, pathP
|
||||
let (seg, s') = B.splitAt 2 s
|
||||
in seg : splitSegments (n - 1) s'
|
||||
|
||||
msgQueueStatePath :: FilePath -> String -> FilePath
|
||||
msgQueueStatePath dir queueId = dir </> (queueLogFileName <> "." <> queueId <> logFileExt)
|
||||
msgQueueStatePath :: FilePath -> RecipientId -> FilePath
|
||||
msgQueueStatePath dir rId = dir </> (queueLogFileName <> "." <> B.unpack (strEncode rId) <> logFileExt)
|
||||
|
||||
createNewJournal :: FilePath -> ByteString -> IO Handle
|
||||
createNewJournal dir journalId = do
|
||||
@@ -1045,104 +1039,29 @@ hClose h =
|
||||
closeOnException :: Handle -> IO a -> IO a
|
||||
closeOnException h a = a `E.onException` hClose h
|
||||
|
||||
exportJournalMessages :: Bool -> JournalMsgStore s -> Handle -> IO Int
|
||||
exportJournalMessages tty ms@JournalMsgStore {config} h = ifM (doesDirectoryExist storePath) exportStore (pure 0)
|
||||
where
|
||||
exportStore = do
|
||||
(!qCount, !msgCount) <- foldQueues 0 exportQueueMessages (0, 0) ("", storePath)
|
||||
putStrLn $ progress qCount
|
||||
pure msgCount
|
||||
JournalStoreConfig {storePath, pathParts} = config
|
||||
exportQueueMessages :: (Int, Int) -> (String, FilePath) -> IO (Int, Int)
|
||||
exportQueueMessages (!i, !count) (queueId, dir) = do
|
||||
let i' = i + 1
|
||||
when (tty && i' `mod` 100000 == 0) $ putStr (progress i' <> "\r") >> IO.hFlush stdout
|
||||
case strDecode $ B.pack queueId of
|
||||
Right rId -> do
|
||||
let statePath = msgQueueStatePath dir queueId
|
||||
msgs <-
|
||||
readQueueState ms statePath >>= \case
|
||||
(Just MsgQueueState {readState = rs, writeState = ws, size}, _)
|
||||
| size == 0 -> pure []
|
||||
| journalId rs == journalId ws ->
|
||||
getMsgs rs (bytePos rs) (bytePos ws - bytePos rs)
|
||||
| otherwise ->
|
||||
(++)
|
||||
<$> getMsgs rs (byteCount rs - bytePos rs) (bytePos rs)
|
||||
<*> getMsgs ws (0 :: Int64) (bytePos ws)
|
||||
where
|
||||
getMsgs :: JournalState t -> Int64 -> Int64 -> IO [Message]
|
||||
getMsgs js pos sz
|
||||
| sz > 0 = IO.withFile f ReadWriteMode $ \h' -> do
|
||||
IO.hSeek h' AbsoluteSeek $ fromIntegral pos
|
||||
parseMsgs f =<< B.hGet h' (fromIntegral sz)
|
||||
| otherwise = pure []
|
||||
where
|
||||
f = journalFilePath dir $ journalId js
|
||||
parseMsgs f s = do
|
||||
let (errs, msgs) = partitionEithers $ map (strDecode @Message) $ B.lines s
|
||||
unless (null errs) $ putStrLn $ "Error reading " <> show (length errs) <> " messages from " <> f
|
||||
pure msgs
|
||||
_ -> pure []
|
||||
unless (null msgs) $ BLD.hPutBuilder h $ encodeMessages rId msgs
|
||||
pure (i', count + length msgs)
|
||||
Left e -> do
|
||||
logError $ "STORE: exportQueueMessages, message queue directory " <> T.pack dir <> " is invalid, " <> tshow e
|
||||
exitFailure
|
||||
progress i = "Processed: " <> show i <> " queues"
|
||||
foldQueues depth f acc (queueId, path) = do
|
||||
let f' = if depth == pathParts - 1 then f else foldQueues (depth + 1) f
|
||||
listDirs >>= foldM f' acc
|
||||
where
|
||||
listDirs = fmap catMaybes . mapM queuePath . sortBy compareB64 =<< listDirectory path
|
||||
compareB64 [] [] = EQ
|
||||
compareB64 [] _ = LT
|
||||
compareB64 _ [] = GT
|
||||
compareB64 (c : cs) (c' : cs')
|
||||
| c == c' = compareB64 cs cs'
|
||||
| otherwise = compare (charValue c) (charValue c')
|
||||
charValue '-' = 62
|
||||
charValue '_' = 63
|
||||
charValue c
|
||||
| c >= 'A' && c <= 'Z' = ord c - ord 'A'
|
||||
| c >= 'a' && c <= 'z' = 26 + ord c - ord 'a'
|
||||
| c >= '0' && c <= '9' = 52 + ord c - ord '0'
|
||||
| otherwise = -1
|
||||
queuePath dir = do
|
||||
let !path' = path </> dir
|
||||
!queueId' = queueId <> dir
|
||||
ifM
|
||||
(doesDirectoryExist path')
|
||||
(pure $ Just (queueId', path'))
|
||||
(Nothing <$ putStrLn ("Error: path " <> path' <> " is not a directory, skipping"))
|
||||
|
||||
getJournalQueueMessages :: JournalMsgStore s -> JournalQueue s -> IO [Message]
|
||||
getJournalQueueMessages ms q = do
|
||||
let rId = recipientId' q
|
||||
dir = msgQueueDirectory ms rId
|
||||
statePath = msgQueueStatePath dir $ B.unpack (strEncode rId)
|
||||
readQueueState ms statePath >>= \case
|
||||
getJournalQueueMessages ms q =
|
||||
readQueueState ms (msgQueueStatePath dir rId) >>= \case
|
||||
(Just MsgQueueState {readState = rs, writeState = ws, size}, _)
|
||||
| size == 0 -> pure []
|
||||
| journalId rs == journalId ws -> do
|
||||
let f = journalFilePath dir $ journalId rs
|
||||
s <- B.readFile f
|
||||
parseMsgs f $ B.take (bytePos' ws - bytePos' rs) $ B.drop (bytePos' rs) s
|
||||
| otherwise -> do
|
||||
let rf = journalFilePath dir $ journalId rs
|
||||
wf = journalFilePath dir $ journalId ws
|
||||
r <- B.readFile rf
|
||||
w <- B.readFile wf
|
||||
rMsgs <- parseMsgs rf $ B.take (fromIntegral $ byteCount rs) $ B.drop (bytePos' rs) r
|
||||
wMsgs <- parseMsgs wf $ B.take (bytePos' ws) w
|
||||
pure $ rMsgs ++ wMsgs
|
||||
msgs <- getMsgs rs (bytePos rs) (byteCount rs - bytePos rs)
|
||||
if journalId rs == journalId ws
|
||||
then pure msgs
|
||||
else (msgs ++) <$> getMsgs ws (0 :: Int64) (bytePos ws)
|
||||
_ -> pure []
|
||||
where
|
||||
bytePos' = fromIntegral . bytePos
|
||||
parseMsgs f s = do
|
||||
let (errs, msgs) = partitionEithers $ map strDecode $ B.lines s
|
||||
unless (null errs) $ putStrLn $ "Error reading " <> show (length errs) <> " messages from " <> f
|
||||
pure msgs
|
||||
|
||||
encodeMessages :: RecipientId -> [Message] -> BLD.Builder
|
||||
encodeMessages rId = mconcat . map (\msg -> BLD.byteString (strEncode $ MLRv3 rId msg) <> BLD.char8 '\n')
|
||||
rId = recipientId' q
|
||||
dir = msgQueueDirectory ms rId
|
||||
getMsgs :: JournalState t -> Int64 -> Int64 -> IO [Message]
|
||||
getMsgs js pos sz
|
||||
| sz > 0 = IO.withFile f ReadWriteMode $ \h' -> do
|
||||
IO.hSeek h' AbsoluteSeek $ fromIntegral pos
|
||||
parseMsgs =<< B.hGet h' (fromIntegral sz)
|
||||
| otherwise = pure []
|
||||
where
|
||||
f = journalFilePath dir $ journalId js
|
||||
parseMsgs s = do
|
||||
let (errs, msgs) = partitionEithers $ map (strDecode @Message) $ B.lines s
|
||||
unless (null errs) $ putStrLn $ "Error reading " <> show (length errs) <> " messages from " <> f
|
||||
pure msgs
|
||||
|
||||
@@ -84,6 +84,7 @@ data MessageStats = MessageStats
|
||||
expiredMsgsCount :: Int,
|
||||
storedQueues :: Int
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
instance Monoid MessageStats where
|
||||
mempty = MessageStats 0 0 0
|
||||
|
||||
@@ -23,7 +23,6 @@ import Control.Monad
|
||||
import Control.Monad.IO.Class
|
||||
import Control.Monad.Trans.Except
|
||||
import Crypto.Random (ChaChaDRG)
|
||||
import qualified Data.ByteString.Base64.URL as B64
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.List (isPrefixOf, isSuffixOf)
|
||||
@@ -227,9 +226,15 @@ testExportImportStore ms = do
|
||||
length <$> listDirectory (msgQueueDirectory ms rId1) `shouldReturn` 2
|
||||
length <$> listDirectory (msgQueueDirectory ms rId2) `shouldReturn` 3
|
||||
exportMessages False (StoreJournal ms) testStoreMsgsFile False
|
||||
-- testFastExport ms testStoreMsgsFile
|
||||
closeMsgStore ms
|
||||
closeStoreLog sl
|
||||
-- export with closed queues and compare
|
||||
ms2 <- newMsgStore $ testJournalStoreCfg MQStoreCfg
|
||||
readWriteQueueStore True (mkQueue ms2 True) testStoreLogFile (stmQueueStore ms2) >>= closeStoreLog
|
||||
exportMessages False (StoreJournal ms2) (testStoreMsgsFile <> ".copy") False
|
||||
s <- B.readFile testStoreMsgsFile
|
||||
B.readFile (testStoreMsgsFile <> ".copy") `shouldReturn` s
|
||||
|
||||
let cfg = (testJournalStoreCfg MQStoreCfg :: JournalStoreConfig 'QSMemory) {storePath = testStoreMsgsDir2}
|
||||
ms' <- newMsgStore cfg
|
||||
readWriteQueueStore True (mkQueue ms' True) testStoreLogFile (stmQueueStore ms') >>= closeStoreLog
|
||||
@@ -239,7 +244,6 @@ testExportImportStore ms = do
|
||||
length <$> listDirectory (msgQueueDirectory ms rId1) `shouldReturn` 2
|
||||
length <$> listDirectory (msgQueueDirectory ms rId2) `shouldReturn` 3 -- 2 message files
|
||||
exportMessages False (StoreJournal ms') testStoreMsgsFile2 False
|
||||
-- testFastExport ms' testStoreMsgsFile2
|
||||
(B.readFile testStoreMsgsFile2 `shouldReturn`) =<< B.readFile (testStoreMsgsFile <> ".bak")
|
||||
stmStore <- newMsgStore testSMTStoreConfig
|
||||
readWriteQueueStore True (mkQueue stmStore True) testStoreLogFile (queueStore stmStore) >>= closeStoreLog
|
||||
@@ -247,18 +251,13 @@ testExportImportStore ms = do
|
||||
importMessages False stmStore testStoreMsgsFile2 Nothing False
|
||||
exportMessages False (StoreMemory stmStore) testStoreMsgsFile False
|
||||
(B.sort <$> B.readFile testStoreMsgsFile `shouldReturn`) =<< (B.sort <$> B.readFile (testStoreMsgsFile2 <> ".bak"))
|
||||
where
|
||||
testFastExport ms' f = do
|
||||
void $ withFile (f <> ".fast") WriteMode $ exportJournalMessages False ms'
|
||||
s <- B.readFile f
|
||||
B.readFile (f <> ".fast") `shouldReturn` s
|
||||
|
||||
testQueueState :: JournalMsgStore s -> IO ()
|
||||
testQueueState ms = do
|
||||
g <- C.newRandom
|
||||
rId <- EntityId <$> atomically (C.randomBytes 24 g)
|
||||
let dir = msgQueueDirectory ms rId
|
||||
statePath = msgQueueStatePath dir $ B.unpack (B64.encode $ unEntityId rId)
|
||||
statePath = msgQueueStatePath dir rId
|
||||
createDirectoryIfMissing True dir
|
||||
state <- newMsgQueueState <$> newJournalId (random ms)
|
||||
withFile statePath WriteMode (`appendState` state)
|
||||
@@ -319,7 +318,7 @@ testMessageState ms = do
|
||||
g <- C.newRandom
|
||||
(rId, qr) <- testNewQueueRec g QMMessaging
|
||||
let dir = msgQueueDirectory ms rId
|
||||
statePath = msgQueueStatePath dir $ B.unpack (B64.encode $ unEntityId rId)
|
||||
statePath = msgQueueStatePath dir rId
|
||||
write q s = writeMsg ms q True =<< mkMessage s
|
||||
|
||||
mId1 <- runRight $ do
|
||||
@@ -344,7 +343,7 @@ testRemoveJournals ms = do
|
||||
g <- C.newRandom
|
||||
(rId, qr) <- testNewQueueRec g QMMessaging
|
||||
let dir = msgQueueDirectory ms rId
|
||||
statePath = msgQueueStatePath dir $ B.unpack (B64.encode $ unEntityId rId)
|
||||
statePath = msgQueueStatePath dir rId
|
||||
write q s = writeMsg ms q True =<< mkMessage s
|
||||
|
||||
runRight $ do
|
||||
@@ -449,7 +448,7 @@ testExpireIdleQueues = do
|
||||
ms <- newMsgStore (testJournalStoreCfg MQStoreCfg) {idleInterval = 0}
|
||||
|
||||
let dir = msgQueueDirectory ms rId
|
||||
statePath = msgQueueStatePath dir $ B.unpack (B64.encode $ unEntityId rId)
|
||||
statePath = msgQueueStatePath dir rId
|
||||
write q s = writeMsg ms q True =<< mkMessage s
|
||||
|
||||
q <- runRight $ do
|
||||
|
||||
Reference in New Issue
Block a user