From fa319d798ab17c02f0d8206605a19b4f30eeea2b Mon Sep 17 00:00:00 2001 From: Evgeny Date: Mon, 17 Feb 2025 23:11:34 +0000 Subject: [PATCH] smp server: remove empty journals when opening message queue (#1456) * smp server: remove empty journals when opening message queue * update, do not backup state * test * version * do not close queue state when queue is opened for writing * comment * quota = 4 * refactor openMsgQueue to prevent extra state backups * use interval in config * version, expire backups after 5 min * refactor * test --- cabal.project | 1 + simplexmq.cabal | 1 + src/Simplex/Messaging/Server/Env/STM.hs | 20 +- src/Simplex/Messaging/Server/Main.hs | 5 +- .../Messaging/Server/MsgStore/Journal.hs | 183 ++++++++++++------ src/Simplex/Messaging/Server/MsgStore/STM.hs | 6 +- .../Messaging/Server/MsgStore/Types.hs | 4 +- tests/AgentTests/FunctionalAPITests.hs | 1 + tests/CoreTests/MsgStoreTests.hs | 140 ++++++++++++-- tests/README.md | 7 + 10 files changed, 280 insertions(+), 88 deletions(-) create mode 100644 tests/README.md diff --git a/cabal.project b/cabal.project index b26c04055..924f09c9e 100644 --- a/cabal.project +++ b/cabal.project @@ -4,6 +4,7 @@ packages: . -- packages: . ../http2 -- packages: . ../network-transport +-- uncomment two sections below to run tests with coverage -- package * -- coverage: True -- library-coverage: True diff --git a/simplexmq.cabal b/simplexmq.cabal index 334dcbcfe..40da60b49 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -457,6 +457,7 @@ test-suite simplexmq-test apps/smp-server/web default-extensions: StrictData + -- add -fhpc to ghc-options to run tests with coverage ghc-options: -Weverything -Wno-missing-exported-signatures -Wno-missing-import-lists -Wno-missed-specialisations -Wno-all-missed-specialisations -Wno-unsafe -Wno-safe -Wno-missing-local-signatures -Wno-missing-kind-signatures -Wno-missing-deriving-strategies -Wno-monomorphism-restriction -Wno-prepositive-qualified-module -Wno-implicit-prelude -Wno-missing-safe-haskell-mode -Wno-missing-export-lists -Wno-partial-fields -Wcompat -Werror=incomplete-record-updates -Werror=incomplete-patterns -Werror=incomplete-uni-patterns -Werror=missing-methods -Werror=tabs -Wredundant-constraints -Wincomplete-record-updates -Wunused-type-patterns -O2 -threaded -rtsopts -with-rtsopts=-A64M -with-rtsopts=-N1 build-depends: base diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 4044e0d22..58d57a4c5 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -25,7 +25,7 @@ import Data.List (intercalate) import Data.List.NonEmpty (NonEmpty) import Data.Maybe (isJust, isNothing) import qualified Data.Text as T -import Data.Time.Clock (getCurrentTime) +import Data.Time.Clock (getCurrentTime, nominalDay) import Data.Time.Clock.System (SystemTime) import qualified Data.X509 as X import Data.X509.Validation (Fingerprint (..)) @@ -297,8 +297,8 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, storeLogFile, msgSt msgStore@(AMS _ store) <- case msgStoreType of AMSType SMSMemory -> AMS SMSMemory <$> newMsgStore STMStoreConfig {storePath = storeMsgsFile, quota = msgQueueQuota} AMSType SMSJournal -> case storeMsgsFile of - Just storePath -> - let cfg = JournalStoreConfig {storePath, quota = msgQueueQuota, pathParts = journalMsgStoreDepth, maxMsgCount = maxJournalMsgCount, maxStateLines = maxJournalStateLines, stateTailSize = defaultStateTailSize, idleInterval = idleQueueInterval} + Just storePath -> + let cfg = mkJournalStoreConfig storePath msgQueueQuota maxJournalMsgCount maxJournalStateLines idleQueueInterval in AMS SMSJournal <$> newMsgStore cfg Nothing -> putStrLn "Error: journal msg store require path in [STORE_LOG], restore_messages" >> exitFailure ntfStore <- NtfStore <$> TM.emptyIO @@ -357,6 +357,20 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, storeLogFile, msgSt | isJust storeMsgsFile = SPMMessages | otherwise = SPMQueues +mkJournalStoreConfig :: FilePath -> Int -> Int -> Int -> Int64 -> JournalStoreConfig +mkJournalStoreConfig storePath msgQueueQuota maxJournalMsgCount maxJournalStateLines idleQueueInterval = + JournalStoreConfig + { storePath, + quota = msgQueueQuota, + pathParts = journalMsgStoreDepth, + maxMsgCount = maxJournalMsgCount, + maxStateLines = maxJournalStateLines, + stateTailSize = defaultStateTailSize, + idleInterval = idleQueueInterval, + expireBackupsAfter = 14 * nominalDay, + keepMinBackups = 2 + } + newSMPProxyAgent :: SMPClientAgentConfig -> TVar ChaChaDRG -> IO ProxyAgent newSMPProxyAgent smpAgentCfg random = do smpAgent <- newSMPClientAgent smpAgentCfg random diff --git a/src/Simplex/Messaging/Server/Main.hs b/src/Simplex/Messaging/Server/Main.hs index 1d21ffa6a..a03aaa68d 100644 --- a/src/Simplex/Messaging/Server/Main.hs +++ b/src/Simplex/Messaging/Server/Main.hs @@ -44,7 +44,6 @@ import Simplex.Messaging.Server.CLI import Simplex.Messaging.Server.Env.STM import Simplex.Messaging.Server.Expiration import Simplex.Messaging.Server.Information -import Simplex.Messaging.Server.MsgStore.Journal (JournalStoreConfig (..)) import Simplex.Messaging.Server.MsgStore.Types (AMSType (..), SMSType (..), newMsgStore) import Simplex.Messaging.Server.QueueStore.STM (readQueueStore) import Simplex.Messaging.Transport (simplexMQVersion, supportedProxyClientSMPRelayVRange, supportedServerSMPRelayVRange) @@ -147,7 +146,9 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = doesFileExist iniFile >>= \case True -> readIniFile iniFile >>= either exitError a _ -> exitError $ "Error: server is not initialized (" <> iniFile <> " does not exist).\nRun `" <> executableName <> " init`." - newJournalMsgStore = newMsgStore JournalStoreConfig {storePath = storeMsgsJournalDir, pathParts = journalMsgStoreDepth, quota = defaultMsgQueueQuota, maxMsgCount = defaultMaxJournalMsgCount, maxStateLines = defaultMaxJournalStateLines, stateTailSize = defaultStateTailSize, idleInterval = checkInterval defaultMessageExpiration} + newJournalMsgStore = + let cfg = mkJournalStoreConfig storeMsgsJournalDir defaultMsgQueueQuota defaultMaxJournalMsgCount defaultMaxJournalStateLines $ checkInterval defaultMessageExpiration + in newMsgStore cfg iniFile = combine cfgPath "smp-server.ini" serverVersion = "SMP server v" <> simplexMQVersion defaultServerPorts = "5223,443" diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index 0834ad1d9..3b897de37 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -15,7 +15,7 @@ {-# LANGUAGE TupleSections #-} module Simplex.Messaging.Server.MsgStore.Journal - ( JournalMsgStore (queueStore, random), + ( JournalMsgStore (queueStore, random, expireBackupsBefore), JournalQueue, JournalMsgQueue (queue, state), JMQueue (queueDirectory, statePath), @@ -28,7 +28,7 @@ module Simplex.Messaging.Server.MsgStore.Journal SJournalType (..), msgQueueDirectory, msgQueueStatePath, - readWriteQueueState, + readQueueState, newMsgQueueState, newJournalId, appendState, @@ -48,12 +48,13 @@ import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Data.Functor (($>)) import Data.Int (Int64) -import Data.List (intercalate) -import Data.Maybe (catMaybes, fromMaybe, isNothing) +import Data.List (intercalate, sort) +import Data.Maybe (catMaybes, fromMaybe, isNothing, mapMaybe) +import Data.Text (Text) import qualified Data.Text as T -import Data.Time.Clock (getCurrentTime) +import Data.Time.Clock (NominalDiffTime, UTCTime, addUTCTime, getCurrentTime) import Data.Time.Clock.System (SystemTime (..), getSystemTime) -import Data.Time.Format.ISO8601 (iso8601Show) +import Data.Time.Format.ISO8601 (iso8601Show, iso8601ParseM) import GHC.IO (catchAny) import Simplex.Messaging.Agent.Client (getMapLock, withLockMap) import Simplex.Messaging.Agent.Lock @@ -65,10 +66,10 @@ import Simplex.Messaging.Server.QueueStore.STM import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Server.StoreLog -import Simplex.Messaging.Util (ifM, tshow, ($>>=), (<$$>)) +import Simplex.Messaging.Util (ifM, tshow, whenM, ($>>=), (<$$>)) import System.Directory import System.Exit -import System.FilePath (()) +import System.FilePath (takeFileName, ()) import System.IO (BufferMode (..), Handle, IOMode (..), SeekMode (..), stdout) import qualified System.IO as IO import System.Random (StdGen, genByteString, newStdGen) @@ -77,7 +78,8 @@ data JournalMsgStore = JournalMsgStore { config :: JournalStoreConfig, random :: TVar StdGen, queueLocks :: TMap RecipientId Lock, - queueStore :: STMQueueStore JournalQueue + queueStore :: STMQueueStore JournalQueue, + expireBackupsBefore :: UTCTime } data JournalStoreConfig = JournalStoreConfig @@ -91,7 +93,10 @@ data JournalStoreConfig = JournalStoreConfig maxStateLines :: Int, stateTailSize :: Int, -- time in seconds after which the queue will be closed after message expiration - idleInterval :: Int64 + idleInterval :: Int64, + -- expire state backup files + expireBackupsAfter :: NominalDiffTime, + keepMinBackups :: Int } data JournalQueue = JournalQueue @@ -238,7 +243,8 @@ instance MsgStoreClass JournalMsgStore where random <- newTVarIO =<< newStdGen queueLocks <- TM.emptyIO queueStore <- newQueueStore - pure JournalMsgStore {config, random, queueLocks, queueStore} + expireBackupsBefore <- addUTCTime (- expireBackupsAfter config) <$> getCurrentTime + pure JournalMsgStore {config, random, queueLocks, queueStore, expireBackupsBefore} setStoreLog :: JournalMsgStore -> StoreLog 'WriteMode -> IO () setStoreLog st sl = atomically $ writeTVar (storeLog $ queueStore st) (Just sl) @@ -265,7 +271,7 @@ instance MsgStoreClass JournalMsgStore where r' <- case strDecode $ B.pack queueId of Right rId -> getQueue ms SRecipient rId >>= \case - Right q -> unStoreIO (getMsgQueue ms q) *> action q <* closeMsgQueue q + Right q -> unStoreIO (getMsgQueue ms q False) *> action q <* closeMsgQueue q Left AUTH -> do logWarn $ "STORE: processQueue, queue " <> T.pack queueId <> " was removed, removing " <> T.pack dir removeQueueDirectory_ dir @@ -307,15 +313,15 @@ instance MsgStoreClass JournalMsgStore where queueRec' = queueRec {-# INLINE queueRec' #-} - getMsgQueue :: JournalMsgStore -> JournalQueue -> StoreIO JournalMsgQueue - getMsgQueue ms@JournalMsgStore {random} JournalQueue {recipientId = rId, msgQueue_} = + getMsgQueue :: JournalMsgStore -> JournalQueue -> Bool -> StoreIO JournalMsgQueue + getMsgQueue ms@JournalMsgStore {random} JournalQueue {recipientId = rId, msgQueue_} forWrite = StoreIO $ readTVarIO msgQueue_ >>= maybe newQ pure where newQ = do let dir = msgQueueDirectory ms rId statePath = msgQueueStatePath dir $ B.unpack (strEncode rId) queue = JMQueue {queueDirectory = dir, statePath} - q <- ifM (doesDirectoryExist dir) (openMsgQueue ms queue) (createQ queue) + q <- ifM (doesDirectoryExist dir) (openMsgQueue ms queue forWrite) (createQ queue) atomically $ writeTVar msgQueue_ $ Just q pure q where @@ -342,7 +348,7 @@ instance MsgStoreClass JournalMsgStore where pure r where peek = do - mq <- getMsgQueue ms q + mq <- getMsgQueue ms q False (mq,) <$$> tryPeekMsg_ q mq -- only runs action if queue is not empty @@ -390,7 +396,7 @@ instance MsgStoreClass JournalMsgStore where writeMsg :: JournalMsgStore -> JournalQueue -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool)) writeMsg ms q' logState msg = isolateQueue q' "writeMsg" $ do - q <- getMsgQueue ms q' + q <- getMsgQueue ms q' True StoreIO $ (`E.finally` updateActiveAt q') $ do st@MsgQueueState {canWrite, size} <- readTVarIO (state q) let empty = size == 0 @@ -425,7 +431,6 @@ instance MsgStoreClass JournalMsgStore where createQueueDir = do createDirectoryIfMissing True queueDirectory sh <- openFile statePath AppendMode - B.hPutStr sh "" rh <- createNewJournal queueDirectory $ journalId rs let hs = MsgQueueHandles {stateHandle = sh, readHandle = rh, writeHandle = Nothing} atomically $ writeTVar handles $ Just hs @@ -488,12 +493,65 @@ tryStore op rId a = ExceptT $ E.mask_ $ E.try a >>= either storeErr pure isolateQueueId :: String -> JournalMsgStore -> RecipientId -> IO (Either ErrorType a) -> ExceptT ErrorType IO a isolateQueueId op ms rId = tryStore op rId . withLockMap (queueLocks ms) rId op -openMsgQueue :: JournalMsgStore -> JMQueue -> IO JournalMsgQueue -openMsgQueue ms q@JMQueue {queueDirectory = dir, statePath} = do - (st, sh) <- readWriteQueueState ms statePath - (st', rh, wh_) <- closeOnException sh $ openJournals ms dir st sh - let hs = MsgQueueHandles {stateHandle = sh, readHandle = rh, writeHandle = wh_} - mkJournalQueue q st' (Just hs) +openMsgQueue :: JournalMsgStore -> JMQueue -> Bool -> IO JournalMsgQueue +openMsgQueue ms@JournalMsgStore {config} q@JMQueue {queueDirectory = dir, statePath} forWrite = do + (st_, shouldBackup) <- readQueueState ms statePath + case st_ of + Nothing -> do + st <- newMsgQueueState <$> newJournalId (random ms) + when shouldBackup $ backupQueueState statePath -- rename invalid state file + mkJournalQueue q st Nothing + Just st + | size st == 0 -> do + (st', hs_) <- removeJournals st shouldBackup + mkJournalQueue q st' hs_ + | otherwise -> do + sh <- openBackupQueueState st shouldBackup + (st', rh, wh_) <- closeOnException sh $ openJournals ms dir st sh + let hs = MsgQueueHandles {stateHandle = sh, readHandle = rh, writeHandle = wh_} + mkJournalQueue q st' (Just hs) + where + -- If the queue is empty, journals are deleted. + -- New journal is created if queue is written to. + -- canWrite is set to True. + removeJournals MsgQueueState {readState = rs, writeState = ws} shouldBackup = E.uninterruptibleMask_ $ do + rjId <- newJournalId $ random ms + let st = newMsgQueueState rjId + hs_ <- + if forWrite + then Just <$> newJournalHandles st rjId + else Nothing <$ backupQueueState statePath + removeJournalIfExists dir rs + unless (journalId ws == journalId rs) $ removeJournalIfExists dir ws + pure (st, hs_) + where + newJournalHandles st rjId = do + sh <- openBackupQueueState st shouldBackup + appendState_ sh st + rh <- closeOnException sh $ createNewJournal dir rjId + pure MsgQueueHandles {stateHandle = sh, readHandle = rh, writeHandle = Nothing} + openBackupQueueState st shouldBackup + | shouldBackup = do + -- State backup is made in two steps to mitigate the crash during the backup. + -- Temporary backup file will be used when it is present. + let tempBackup = statePath <> ".bak" + renameFile statePath tempBackup -- 1) temp backup + sh <- openFile statePath AppendMode + closeOnException sh $ appendState sh st -- 2) save state to new file + backupQueueState tempBackup -- 3) timed backup + pure sh + | otherwise = openFile statePath AppendMode + backupQueueState path = do + ts <- getCurrentTime + renameFile path $ stateBackupPath statePath ts + -- remove old backups + times <- sort . mapMaybe backupPathTime <$> listDirectory dir + let toDelete = filter (< expireBackupsBefore ms) $ take (length times - keepMinBackups config) times + mapM_ (safeRemoveFile "removeBackups" . stateBackupPath statePath) toDelete + where + backupPathTime :: FilePath -> Maybe UTCTime + backupPathTime = iso8601ParseM . T.unpack <=< T.stripSuffix ".bak" <=< T.stripPrefix statePathPfx . T.pack + statePathPfx = T.pack $ takeFileName statePath <> "." mkJournalQueue :: JMQueue -> MsgQueueState -> Maybe MsgQueueHandles -> IO JournalMsgQueue mkJournalQueue queue st hs_ = do @@ -527,7 +585,11 @@ updateQueueState q log' hs st a = do atomically $ writeTVar (state q) st >> a appendState :: Handle -> MsgQueueState -> IO () -appendState h st = E.uninterruptibleMask_ $ B.hPutStr h $ strEncode st `B.snoc` '\n' +appendState h = E.uninterruptibleMask_ . appendState_ h +{-# INLINE appendState #-} + +appendState_ :: Handle -> MsgQueueState -> IO () +appendState_ h st = B.hPutStr h $ strEncode st `B.snoc` '\n' updateReadPos :: JournalMsgQueue -> Bool -> Int64 -> MsgQueueHandles -> IO () updateReadPos q log' len hs = do @@ -628,62 +690,57 @@ fixFileSize h pos = do | otherwise -> pure () removeJournal :: FilePath -> JournalState t -> IO () -removeJournal dir JournalState {journalId} = do +removeJournal dir JournalState {journalId} = + safeRemoveFile "removeJournal" $ journalFilePath dir journalId + +removeJournalIfExists :: FilePath -> JournalState t -> IO () +removeJournalIfExists dir JournalState {journalId} = do let path = journalFilePath dir journalId - removeFile path `catchAny` (\e -> logError $ "STORE: removeJournal, " <> T.pack path <> ", " <> tshow e) + handleError "removeJournalIfExists" path $ + whenM (doesFileExist path) $ removeFile path + +safeRemoveFile :: Text -> FilePath -> IO () +safeRemoveFile cxt path = handleError cxt path $ removeFile path + +handleError :: Text -> FilePath -> IO () -> IO () +handleError cxt path a = + a `catchAny` \e -> logError $ "STORE: " <> cxt <> ", " <> T.pack path <> ", " <> tshow e -- This function is supposed to be resilient to crashes while updating state files, -- and also resilient to crashes during its execution. -readWriteQueueState :: JournalMsgStore -> FilePath -> IO (MsgQueueState, Handle) -readWriteQueueState JournalMsgStore {random, config} statePath = +readQueueState :: JournalMsgStore -> FilePath -> IO (Maybe MsgQueueState, Bool) +readQueueState JournalMsgStore {config} statePath = ifM (doesFileExist tempBackup) - (renameFile tempBackup statePath >> readQueueState) - (ifM (doesFileExist statePath) readQueueState writeNewQueueState) + (renameFile tempBackup statePath >> readState) + (ifM (doesFileExist statePath) readState $ pure (Nothing, False)) where tempBackup = statePath <> ".bak" - readQueueState = do + readState = do ls <- B.lines <$> readFileTail case ls of - [] -> writeNewQueueState + [] -> do + logWarn $ "STORE: readWriteQueueState, empty queue state, " <> T.pack statePath + pure (Nothing, False) _ -> do - r@(st, _) <- useLastLine (length ls) True ls - unless (validQueueState st) $ E.throwIO $ userError $ "readWriteQueueState inconsistent state: " <> show st + r <- useLastLine (length ls) True ls + forM_ (fst r) $ \st -> + unless (validQueueState st) $ E.throwIO $ userError $ "readWriteQueueState inconsistent state: " <> show st pure r - writeNewQueueState = do - logWarn $ "STORE: readWriteQueueState, empty queue state - initialized, " <> T.pack statePath - st <- newMsgQueueState <$> newJournalId random - writeQueueState st useLastLine len isLastLine ls = case strDecode $ last ls of - Right st - | len > maxStateLines config || not isLastLine -> - backupWriteQueueState st - | otherwise -> do - -- when state file has fewer than maxStateLines, we don't compact it - sh <- openFile statePath AppendMode - pure (st, sh) + Right st -> + -- when state file has fewer than maxStateLines, we don't compact it + let shouldBackup = len > maxStateLines config || not isLastLine + in pure (Just st, shouldBackup) Left e -- if the last line failed to parse | isLastLine -> case init ls of -- or use the previous line [] -> do logWarn $ "STORE: readWriteQueueState, invalid 1-line queue state - initialized, " <> T.pack statePath - st <- newMsgQueueState <$> newJournalId random - backupWriteQueueState st + pure (Nothing, True) -- backup state file, because last line was invalid ls' -> do logWarn $ "STORE: readWriteQueueState, invalid last line in queue state - using the previous line, " <> T.pack statePath useLastLine len False ls' | otherwise -> E.throwIO $ userError $ "readWriteQueueState invalid state " <> statePath <> ": " <> show e - backupWriteQueueState st = do - -- State backup is made in two steps to mitigate the crash during the backup. - -- Temporary backup file will be used when it is present. - renameFile statePath tempBackup -- 1) temp backup - r <- writeQueueState st -- 2) save state - ts <- getCurrentTime - renameFile tempBackup (statePath <> "." <> iso8601Show ts <> ".bak") -- 3) timed backup - pure r - writeQueueState st = do - sh <- openFile statePath AppendMode - closeOnException sh $ appendState sh st - pure (st, sh) readFileTail = IO.withFile statePath ReadMode $ \h -> do size <- IO.hFileSize h @@ -693,6 +750,9 @@ readWriteQueueState JournalMsgStore {random, config} statePath = then IO.hSeek h AbsoluteSeek (size - sz') >> B.hGet h sz else B.hGet h (fromIntegral size) +stateBackupPath :: FilePath -> UTCTime -> FilePath +stateBackupPath statePath ts = statePath <> "." <> iso8601Show ts <> ".bak" + validQueueState :: MsgQueueState -> Bool validQueueState MsgQueueState {readState = rs, writeState = ws, size} | journalId rs == journalId ws = @@ -739,8 +799,7 @@ removeQueueDirectory st = removeQueueDirectory_ . msgQueueDirectory st removeQueueDirectory_ :: FilePath -> IO () removeQueueDirectory_ dir = - removePathForcibly dir `catchAny` \e -> - logError $ "STORE: removeQueueDirectory, " <> T.pack dir <> ", " <> tshow e + handleError "removeQueueDirectory" dir $ removePathForcibly dir hAppend :: Handle -> Int64 -> ByteString -> IO () hAppend h pos s = do diff --git a/src/Simplex/Messaging/Server/MsgStore/STM.hs b/src/Simplex/Messaging/Server/MsgStore/STM.hs index 2df41a9f1..05ab31475 100644 --- a/src/Simplex/Messaging/Server/MsgStore/STM.hs +++ b/src/Simplex/Messaging/Server/MsgStore/STM.hs @@ -89,8 +89,8 @@ instance MsgStoreClass STMMsgStore where queueRec' = queueRec {-# INLINE queueRec' #-} - getMsgQueue :: STMMsgStore -> STMQueue -> STM STMMsgQueue - getMsgQueue _ STMQueue {msgQueue_} = readTVar msgQueue_ >>= maybe newQ pure + getMsgQueue :: STMMsgStore -> STMQueue -> Bool -> STM STMMsgQueue + getMsgQueue _ STMQueue {msgQueue_} _ = readTVar msgQueue_ >>= maybe newQ pure where newQ = do msgQueue <- newTQueue @@ -131,7 +131,7 @@ instance MsgStoreClass STMMsgStore where writeMsg :: STMMsgStore -> STMQueue -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool)) writeMsg ms q' _logState msg = liftIO $ atomically $ do - STMMsgQueue {msgQueue = q, canWrite, size} <- getMsgQueue ms q' + STMMsgQueue {msgQueue = q, canWrite, size} <- getMsgQueue ms q' True canWrt <- readTVar canWrite empty <- isEmptyTQueue q if canWrt || empty diff --git a/src/Simplex/Messaging/Server/MsgStore/Types.hs b/src/Simplex/Messaging/Server/MsgStore/Types.hs index 7317a0fab..679945f55 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Types.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Types.hs @@ -54,7 +54,7 @@ class Monad (StoreMonad s) => MsgStoreClass s where recipientId' :: StoreQueue s -> RecipientId queueRec' :: StoreQueue s -> TVar (Maybe QueueRec) getPeekMsgQueue :: s -> StoreQueue s -> StoreMonad s (Maybe (MsgQueue s, Message)) - getMsgQueue :: s -> StoreQueue s -> StoreMonad s (MsgQueue s) + getMsgQueue :: s -> StoreQueue s -> Bool -> StoreMonad s (MsgQueue s) -- the journal queue will be closed after action if it was initially closed or idle longer than interval in config withIdleMsgQueue :: Int64 -> s -> StoreQueue s -> (MsgQueue s -> StoreMonad s a) -> StoreMonad s (Maybe a, Int) @@ -119,7 +119,7 @@ withPeekMsgQueue st q op a = isolateQueue q op $ getPeekMsgQueue st q >>= a deleteExpiredMsgs :: MsgStoreClass s => s -> StoreQueue s -> Int64 -> ExceptT ErrorType IO Int deleteExpiredMsgs st q old = isolateQueue q "deleteExpiredMsgs" $ - getMsgQueue st q >>= deleteExpireMsgs_ old q + getMsgQueue st q False >>= deleteExpireMsgs_ old q -- closed and idle queues will be closed after expiration -- returns (expired count, queue size after expiration) diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index 80fff1dbb..027b4cff3 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -364,6 +364,7 @@ functionalAPITests t = do it "should suspend agent on timeout, even if pending messages not sent" $ testSuspendingAgentTimeout t describe "Batching SMP commands" $ do + -- disable this and enable the following test to run tests with coverage it "should subscribe to multiple (200) subscriptions with batching" $ testBatchedSubscriptions 200 10 t skip "faster version of the previous test (200 subscriptions gets very slow with test coverage)" $ diff --git a/tests/CoreTests/MsgStoreTests.hs b/tests/CoreTests/MsgStoreTests.hs index f97930b52..342b5b25f 100644 --- a/tests/CoreTests/MsgStoreTests.hs +++ b/tests/CoreTests/MsgStoreTests.hs @@ -15,6 +15,7 @@ module CoreTests.MsgStoreTests where import AgentTests.FunctionalAPITests (runRight, runRight_) +import Control.Concurrent (threadDelay) import Control.Concurrent.STM import Control.Exception (bracket) import Control.Monad @@ -24,7 +25,9 @@ import Crypto.Random (ChaChaDRG) import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import qualified Data.ByteString.Base64.URL as B64 +import Data.List (isPrefixOf, isSuffixOf) import Data.Maybe (fromJust) +import Data.Time.Clock (addUTCTime) import Data.Time.Clock.System (getSystemTime) import Simplex.Messaging.Crypto (pattern MaxLenBS) import qualified Simplex.Messaging.Crypto as C @@ -40,7 +43,7 @@ import Simplex.Messaging.Server.StoreLog (closeStoreLog, logCreateQueue) import SMPClient (testStoreLogFile, testStoreMsgsDir, testStoreMsgsDir2, testStoreMsgsFile, testStoreMsgsFile2) import System.Directory (copyFile, createDirectoryIfMissing, listDirectory, removeFile, renameFile) import System.FilePath (()) -import System.IO (IOMode (..), hClose, withFile) +import System.IO (IOMode (..), withFile) import Test.Hspec msgStoreTests :: Spec @@ -52,11 +55,14 @@ msgStoreTests = do describe "queue state" $ do it "should restore queue state from the last line" testQueueState it "should recover when message is written and state is not" testMessageState + it "should remove journal files when queue is empty" testRemoveJournals describe "missing files" $ do it "should create read file when missing" testReadFileMissing it "should switch to write file when read file missing" testReadFileMissingSwitch it "should create write file when missing" testWriteFileMissing it "should create read file when read and write files are missing" testReadAndWriteFilesMissing + describe "Journal message store: queue state backup expiration" $ do + it "should remove old queue state backups" testRemoveQueueStateBackups where someMsgStoreTests :: STMStoreClass s => SpecWith s someMsgStoreTests = do @@ -78,7 +84,9 @@ testJournalStoreCfg = maxMsgCount = 4, maxStateLines = 2, stateTailSize = 256, - idleInterval = 21600 + idleInterval = 21600, + expireBackupsAfter = 0, + keepMinBackups = 1 } mkMessage :: MonadIO m => ByteString -> m Message @@ -235,7 +243,7 @@ testQueueState ms = do state <- newMsgQueueState <$> newJournalId (random ms) withFile statePath WriteMode (`appendState` state) length . lines <$> readFile statePath `shouldReturn` 1 - readQueueState statePath `shouldReturn` state + readQueueState ms statePath `shouldReturn` (Just state, False) length <$> listDirectory dir `shouldReturn` 1 -- no backup let state1 = @@ -246,7 +254,7 @@ testQueueState ms = do } withFile statePath AppendMode (`appendState` state1) length . lines <$> readFile statePath `shouldReturn` 2 - readQueueState statePath `shouldReturn` state1 + readQueueState ms statePath `shouldReturn` (Just state1, False) length <$> listDirectory dir `shouldReturn` 1 -- no backup let state2 = @@ -258,28 +266,26 @@ testQueueState ms = do withFile statePath AppendMode (`appendState` state2) length . lines <$> readFile statePath `shouldReturn` 3 copyFile statePath (statePath <> ".2") - readQueueState statePath `shouldReturn` state2 - length <$> listDirectory dir `shouldReturn` 3 -- new state, copy + backup - length . lines <$> readFile statePath `shouldReturn` 1 + readQueueState ms statePath `shouldReturn` (Just state2, True) + length <$> listDirectory dir `shouldReturn` 2 -- new state + copy + ls <- lines <$> readFile statePath + length ls `shouldBe` 3 + -- mock compacting file + writeFile statePath $ last ls -- corrupt the only line corruptFile statePath - newState <- readQueueState statePath - newState `shouldBe` newMsgQueueState (journalId $ writeState newState) + (Nothing, True) <- readQueueState ms statePath -- corrupt the last line renameFile (statePath <> ".2") statePath removeOtherFiles dir statePath length . lines <$> readFile statePath `shouldReturn` 3 corruptFile statePath - readQueueState statePath `shouldReturn` state1 - length <$> listDirectory dir `shouldReturn` 2 - length . lines <$> readFile statePath `shouldReturn` 1 + readQueueState ms statePath `shouldReturn` (Just state1, True) + length <$> listDirectory dir `shouldReturn` 1 + length . lines <$> readFile statePath `shouldReturn` 3 where - readQueueState statePath = do - (state, h) <- readWriteQueueState ms statePath - hClose h - pure state corruptFile f = do s <- readFile f removeFile f @@ -315,6 +321,108 @@ testMessageState ms = do (Msg "message 3", Nothing) <- tryDelPeekMsg ms q mId3 liftIO $ closeMsgQueue q +testRemoveJournals :: JournalMsgStore -> IO () +testRemoveJournals ms = do + g <- C.newRandom + (rId, qr) <- testNewQueueRec g True + let dir = msgQueueDirectory ms rId + statePath = msgQueueStatePath dir $ B.unpack (B64.encode $ unEntityId rId) + write q s = writeMsg ms q True =<< mkMessage s + + runRight $ do + q <- ExceptT $ addQueue ms rId qr + Just (Message {msgId = mId1}, True) <- write q "message 1" + Just (Message {msgId = mId2}, False) <- write q "message 2" + (Msg "message 1", Msg "message 2") <- tryDelPeekMsg ms q mId1 + (Msg "message 2", Nothing) <- tryDelPeekMsg ms q mId2 + liftIO $ closeMsgQueue q + + ls <- B.lines <$> B.readFile statePath + length ls `shouldBe` 4 + journalFilesCount dir `shouldReturn` 1 + stateBackupCount dir `shouldReturn` 0 + + runRight $ do + q <- ExceptT $ getQueue ms SRecipient rId + -- not removed yet + liftIO $ journalFilesCount dir `shouldReturn` 1 + liftIO $ stateBackupCount dir `shouldReturn` 0 + Nothing <- tryPeekMsg ms q + -- still not removed, queue is empty and not opened + liftIO $ journalFilesCount dir `shouldReturn` 1 + _mq <- isolateQueue q "test" $ getMsgQueue ms q False + -- journal is removed + liftIO $ journalFilesCount dir `shouldReturn` 0 + liftIO $ stateBackupCount dir `shouldReturn` 1 + Just (Message {msgId = mId3}, True) <- write q "message 3" + -- journal is created + liftIO $ journalFilesCount dir `shouldReturn` 1 + Just (Message {msgId = mId4}, False) <- write q "message 4" + (Msg "message 3", Msg "message 4") <- tryDelPeekMsg ms q mId3 + (Msg "message 4", Nothing) <- tryDelPeekMsg ms q mId4 + Just (Message {msgId = mId5}, True) <- write q "message 5" + Just (Message {msgId = mId6}, False) <- write q "message 6" + liftIO $ journalFilesCount dir `shouldReturn` 1 + Just (Message {msgId = mId7}, False) <- write q "message 7" + -- separate write journal is created + liftIO $ journalFilesCount dir `shouldReturn` 2 + Nothing <- write q "message 8" + (Msg "message 5", Msg "message 6") <- tryDelPeekMsg ms q mId5 + liftIO $ journalFilesCount dir `shouldReturn` 2 + (Msg "message 6", Msg "message 7") <- tryDelPeekMsg ms q mId6 + -- read journal is removed + liftIO $ journalFilesCount dir `shouldReturn` 1 + (Msg "message 7", Just MessageQuota {msgId = mId8}) <- tryDelPeekMsg ms q mId7 + (Just MessageQuota {}, Nothing) <- tryDelPeekMsg ms q mId8 + liftIO $ closeMsgQueue q + + journalFilesCount dir `shouldReturn` 1 + runRight $ do + q <- ExceptT $ getQueue ms SRecipient rId + Just (Message {}, True) <- write q "message 8" + liftIO $ journalFilesCount dir `shouldReturn` 1 + liftIO $ stateBackupCount dir `shouldReturn` 2 + liftIO $ closeMsgQueue q + where + journalFilesCount dir = length . filter ("messages." `isPrefixOf`) <$> listDirectory dir + stateBackupCount dir = length . filter (".bak" `isSuffixOf`) <$> listDirectory dir + +testRemoveQueueStateBackups :: IO () +testRemoveQueueStateBackups = do + g <- C.newRandom + (rId, qr) <- testNewQueueRec g True + + ms' <- newMsgStore testJournalStoreCfg {maxStateLines = 1, expireBackupsAfter = 0, keepMinBackups = 0} + -- set expiration time 1 second ahead + let ms = ms' {expireBackupsBefore = addUTCTime 1 $ expireBackupsBefore ms'} + + let dir = msgQueueDirectory ms rId + write q s = writeMsg ms q True =<< mkMessage s + + runRight $ do + q <- ExceptT $ addQueue ms rId qr + Just (Message {msgId = mId1}, True) <- write q "message 1" + Just (Message {msgId = mId2}, False) <- write q "message 2" + (Msg "message 1", Msg "message 2") <- tryDelPeekMsg ms q mId1 + (Msg "message 2", Nothing) <- tryDelPeekMsg ms q mId2 + liftIO $ closeMsgQueue q + liftIO $ stateBackupCount dir `shouldReturn` 0 + + q1 <- ExceptT $ getQueue ms SRecipient rId + Just (Message {}, True) <- write q1 "message 3" + Just (Message {}, False) <- write q1 "message 4" + liftIO $ closeMsgQueue q1 + liftIO $ stateBackupCount dir `shouldReturn` 0 + + liftIO $ threadDelay 1000000 + q2 <- ExceptT $ getQueue ms SRecipient rId + Just (Message {}, False) <- write q2 "message 5" + Nothing <- write q2 "message 5" + liftIO $ closeMsgQueue q2 + liftIO $ stateBackupCount dir `shouldReturn` 1 + where + stateBackupCount dir = length . filter (".bak" `isSuffixOf`) <$> listDirectory dir + testReadFileMissing :: JournalMsgStore -> IO () testReadFileMissing ms = do g <- C.newRandom diff --git a/tests/README.md b/tests/README.md new file mode 100644 index 000000000..2a51e23cd --- /dev/null +++ b/tests/README.md @@ -0,0 +1,7 @@ +# Running tests with coverage + +1. Uncomment coverage sections in cabal.project file. +2. Add `-fhpc` to ghc-options of simplexmq-test in simplexmq.cabal file. +3. Disable (`xit`) test "should subscribe to multiple (200) subscriptions with batching", enable (comment `skip`) the next test instead. +4. Run `cabal test`. +5. Open generated coverage report in the browser.