From e2589bc7d4fde069d80467ce651a6e14ba6ad172 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Mon, 17 Feb 2025 18:01:01 +0000 Subject: [PATCH] refactor openMsgQueue to prevent extra state backups --- .../Messaging/Server/MsgStore/Journal.hs | 136 +++++++++--------- tests/CoreTests/MsgStoreTests.hs | 40 ++++-- 2 files changed, 93 insertions(+), 83 deletions(-) diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index 0be726ab1..f16d4b49c 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -28,7 +28,7 @@ module Simplex.Messaging.Server.MsgStore.Journal SJournalType (..), msgQueueDirectory, msgQueueStatePath, - readWriteQueueState, + readQueueState, newMsgQueueState, newJournalId, appendState, @@ -429,7 +429,6 @@ instance MsgStoreClass JournalMsgStore where createQueueDir = do createDirectoryIfMissing True queueDirectory sh <- openFile statePath AppendMode - B.hPutStr sh "" rh <- createNewJournal queueDirectory $ journalId rs let hs = MsgQueueHandles {stateHandle = sh, readHandle = rh, writeHandle = Nothing} atomically $ writeTVar handles $ Just hs @@ -493,40 +492,69 @@ isolateQueueId :: String -> JournalMsgStore -> RecipientId -> IO (Either ErrorTy isolateQueueId op ms rId = tryStore op rId . withLockMap (queueLocks ms) rId op openMsgQueue :: JournalMsgStore -> JMQueue -> Bool -> IO JournalMsgQueue -openMsgQueue ms q@JMQueue {queueDirectory = dir, statePath} forWrite = do - (st, sh) <- readWriteQueueState ms statePath - removeBackups ms q - if size st == 0 - then do - (st', hs_) <- removeJournals st sh - mkJournalQueue q st' hs_ - else do - (st', rh, wh_) <- closeOnException sh $ openJournals ms dir st sh - let hs = MsgQueueHandles {stateHandle = sh, readHandle = rh, writeHandle = wh_} - mkJournalQueue q st' (Just hs) +openMsgQueue ms@JournalMsgStore {config} q@JMQueue {queueDirectory = dir, statePath} forWrite = do + (st_, shouldBackup) <- readQueueState ms statePath + case st_ of + Nothing -> do + st <- newMsgQueueState <$> newJournalId (random ms) + when shouldBackup $ backupQueueState statePath + mkJournalQueue q st Nothing + Just st + | size st == 0 -> do + (st', hs_) <- removeJournals st shouldBackup + mkJournalQueue q st' hs_ + | otherwise -> do + sh <- openBackupQueueState st shouldBackup + (st', rh, wh_) <- closeOnException sh $ openJournals ms dir st sh + let hs = MsgQueueHandles {stateHandle = sh, readHandle = rh, writeHandle = wh_} + mkJournalQueue q st' (Just hs) where -- If the queue is empty, journals are deleted. -- New journal is created if queue is written to. -- canWrite is set to True. - removeJournals MsgQueueState {readState = rs, writeState = ws} sh = E.uninterruptibleMask_ $ do + removeJournals MsgQueueState {readState = rs, writeState = ws} shouldBackup = E.uninterruptibleMask_ $ do rjId <- newJournalId $ random ms - let st' = newMsgQueueState rjId + let st = newMsgQueueState rjId hs_ <- if forWrite - then Just <$> newJournalHandles st' rjId - else Nothing <$ backupQueueState + then Just <$> newJournalHandles st rjId + else Nothing <$ backupQueueState statePath removeJournalIfExists dir rs unless (journalId ws == journalId rs) $ removeJournalIfExists dir ws - pure (st', hs_) + pure (st, hs_) where - newJournalHandles st' rjId = do - appendState_ sh st' + newJournalHandles st rjId = do + sh <- openBackupQueueState st shouldBackup + appendState_ sh st rh <- closeOnException sh $ createNewJournal dir rjId pure MsgQueueHandles {stateHandle = sh, readHandle = rh, writeHandle = Nothing} - backupQueueState = do - hClose sh - ts <- getCurrentTime - renameFile statePath $ stateBackupPath statePath ts + openBackupQueueState st shouldBackup + | shouldBackup = do + -- State backup is made in two steps to mitigate the crash during the backup. + -- Temporary backup file will be used when it is present. + let tempBackup = statePath <> ".bak" + renameFile statePath tempBackup -- 1) temp backup + sh <- writeQueueState st -- 2) save state + backupQueueState tempBackup -- 3) timed backup + pure sh + | otherwise = openFile statePath AppendMode + backupQueueState path = do + ts <- getCurrentTime + renameFile path $ stateBackupPath statePath ts + removeBackups + writeQueueState st = do + sh <- openFile statePath AppendMode + closeOnException sh $ appendState sh st + pure sh + removeBackups = do + times <- sort . mapMaybe backupPathTime <$> listDirectory dir + let toDelete = filter (< expireBefore) $ take (length times - keepMinBackups) times + mapM_ (safeRemoveFile "removeBackups" . stateBackupPath statePath) toDelete + where + JournalStoreConfig {expireBefore, keepMinBackups} = config + backupPathTime :: FilePath -> Maybe UTCTime + backupPathTime = iso8601ParseM . T.unpack <=< T.stripSuffix ".bak" <=< T.stripPrefix statePathPfx . T.pack + statePathPfx = T.pack $ takeFileName statePath <> "." mkJournalQueue :: JMQueue -> MsgQueueState -> Maybe MsgQueueHandles -> IO JournalMsgQueue mkJournalQueue queue st hs_ = do @@ -683,56 +711,39 @@ handleError cxt path a = -- This function is supposed to be resilient to crashes while updating state files, -- and also resilient to crashes during its execution. -readWriteQueueState :: JournalMsgStore -> FilePath -> IO (MsgQueueState, Handle) -readWriteQueueState JournalMsgStore {random, config} statePath = +readQueueState :: JournalMsgStore -> FilePath -> IO (Maybe MsgQueueState, Bool) +readQueueState JournalMsgStore {config} statePath = ifM (doesFileExist tempBackup) - (renameFile tempBackup statePath >> readQueueState) - (ifM (doesFileExist statePath) readQueueState writeNewQueueState) + (renameFile tempBackup statePath >> readState) + (ifM (doesFileExist statePath) readState $ pure (Nothing, False)) where tempBackup = statePath <> ".bak" - readQueueState = do + readState = do ls <- B.lines <$> readFileTail case ls of - [] -> writeNewQueueState + [] -> do + logWarn $ "STORE: readWriteQueueState, empty queue state, " <> T.pack statePath + pure (Nothing, False) _ -> do - r@(st, _) <- useLastLine (length ls) True ls - unless (validQueueState st) $ E.throwIO $ userError $ "readWriteQueueState inconsistent state: " <> show st + r <- useLastLine (length ls) True ls + forM_ (fst r) $ \st -> + unless (validQueueState st) $ E.throwIO $ userError $ "readWriteQueueState inconsistent state: " <> show st pure r - writeNewQueueState = do - logWarn $ "STORE: readWriteQueueState, empty queue state - initialized, " <> T.pack statePath - st <- newMsgQueueState <$> newJournalId random - writeQueueState st useLastLine len isLastLine ls = case strDecode $ last ls of - Right st - | len > maxStateLines config || not isLastLine -> - backupWriteQueueState st - | otherwise -> do - -- when state file has fewer than maxStateLines, we don't compact it - sh <- openFile statePath AppendMode - pure (st, sh) + Right st -> + -- when state file has fewer than maxStateLines, we don't compact it + let shouldBackup = len > maxStateLines config || not isLastLine + in pure (Just st, shouldBackup) Left e -- if the last line failed to parse | isLastLine -> case init ls of -- or use the previous line [] -> do logWarn $ "STORE: readWriteQueueState, invalid 1-line queue state - initialized, " <> T.pack statePath - st <- newMsgQueueState <$> newJournalId random - backupWriteQueueState st + pure (Nothing, True) -- backup state file, because last line was invalid ls' -> do logWarn $ "STORE: readWriteQueueState, invalid last line in queue state - using the previous line, " <> T.pack statePath useLastLine len False ls' | otherwise -> E.throwIO $ userError $ "readWriteQueueState invalid state " <> statePath <> ": " <> show e - backupWriteQueueState st = do - -- State backup is made in two steps to mitigate the crash during the backup. - -- Temporary backup file will be used when it is present. - renameFile statePath tempBackup -- 1) temp backup - r <- writeQueueState st -- 2) save state - ts <- getCurrentTime - renameFile tempBackup $ stateBackupPath statePath ts -- 3) timed backup - pure r - writeQueueState st = do - sh <- openFile statePath AppendMode - closeOnException sh $ appendState sh st - pure (st, sh) readFileTail = IO.withFile statePath ReadMode $ \h -> do size <- IO.hFileSize h @@ -742,17 +753,6 @@ readWriteQueueState JournalMsgStore {random, config} statePath = then IO.hSeek h AbsoluteSeek (size - sz') >> B.hGet h sz else B.hGet h (fromIntegral size) -removeBackups :: JournalMsgStore -> JMQueue -> IO () -removeBackups ms JMQueue {queueDirectory = dir, statePath} = do - times <- sort . mapMaybe backupPathTime <$> listDirectory dir - let toDelete = filter (< expireBefore) $ take (length times - keepMinBackups) times - mapM_ (safeRemoveFile "removeBackups" . stateBackupPath statePath) toDelete - where - JournalMsgStore {config = JournalStoreConfig {expireBefore, keepMinBackups}} = ms - backupPathTime :: FilePath -> Maybe UTCTime - backupPathTime = iso8601ParseM . T.unpack <=< T.stripSuffix ".bak" <=< T.stripPrefix statePathPfx . T.pack - statePathPfx = T.pack $ takeFileName statePath <> "." - stateBackupPath :: FilePath -> UTCTime -> FilePath stateBackupPath statePath ts = statePath <> "." <> iso8601Show ts <> ".bak" diff --git a/tests/CoreTests/MsgStoreTests.hs b/tests/CoreTests/MsgStoreTests.hs index 106825af8..58f1d1625 100644 --- a/tests/CoreTests/MsgStoreTests.hs +++ b/tests/CoreTests/MsgStoreTests.hs @@ -44,7 +44,7 @@ import Simplex.Messaging.Server.StoreLog (closeStoreLog, logCreateQueue) import SMPClient (testStoreLogFile, testStoreMsgsDir, testStoreMsgsDir2, testStoreMsgsFile, testStoreMsgsFile2) import System.Directory (copyFile, createDirectoryIfMissing, listDirectory, removeFile, renameFile) import System.FilePath (()) -import System.IO (IOMode (..), hClose, withFile) +import System.IO (IOMode (..), withFile) import Test.Hspec msgStoreTests :: Spec @@ -244,7 +244,7 @@ testQueueState ms = do state <- newMsgQueueState <$> newJournalId (random ms) withFile statePath WriteMode (`appendState` state) length . lines <$> readFile statePath `shouldReturn` 1 - readQueueState statePath `shouldReturn` state + readQueueState ms statePath `shouldReturn` (Just state, False) length <$> listDirectory dir `shouldReturn` 1 -- no backup let state1 = @@ -255,7 +255,7 @@ testQueueState ms = do } withFile statePath AppendMode (`appendState` state1) length . lines <$> readFile statePath `shouldReturn` 2 - readQueueState statePath `shouldReturn` state1 + readQueueState ms statePath `shouldReturn` (Just state1, False) length <$> listDirectory dir `shouldReturn` 1 -- no backup let state2 = @@ -267,28 +267,26 @@ testQueueState ms = do withFile statePath AppendMode (`appendState` state2) length . lines <$> readFile statePath `shouldReturn` 3 copyFile statePath (statePath <> ".2") - readQueueState statePath `shouldReturn` state2 - length <$> listDirectory dir `shouldReturn` 3 -- new state, copy + backup - length . lines <$> readFile statePath `shouldReturn` 1 + readQueueState ms statePath `shouldReturn` (Just state2, True) + length <$> listDirectory dir `shouldReturn` 2 -- new state + copy + ls <- lines <$> readFile statePath + length ls `shouldBe` 3 + -- mock compacting file + writeFile statePath $ last ls -- corrupt the only line corruptFile statePath - newState <- readQueueState statePath - newState `shouldBe` newMsgQueueState (journalId $ writeState newState) + (Nothing, True) <- readQueueState ms statePath -- corrupt the last line renameFile (statePath <> ".2") statePath removeOtherFiles dir statePath length . lines <$> readFile statePath `shouldReturn` 3 corruptFile statePath - readQueueState statePath `shouldReturn` state1 - length <$> listDirectory dir `shouldReturn` 2 - length . lines <$> readFile statePath `shouldReturn` 1 + readQueueState ms statePath `shouldReturn` (Just state1, False) + length <$> listDirectory dir `shouldReturn` 1 + length . lines <$> readFile statePath `shouldReturn` 3 where - readQueueState statePath = do - (state, h) <- readWriteQueueState ms statePath - hClose h - pure state corruptFile f = do s <- readFile f removeFile f @@ -343,47 +341,59 @@ testRemoveJournals ms = do ls <- B.lines <$> B.readFile statePath length ls `shouldBe` 4 journalFilesCount dir `shouldReturn` 1 + -- print "1" >> getLine runRight $ do q <- ExceptT $ getQueue ms SRecipient rId -- not removed yet liftIO $ journalFilesCount dir `shouldReturn` 1 + -- liftIO $ print "2" >> getLine Nothing <- tryPeekMsg ms q -- still not removed, queue is empty and not opened liftIO $ journalFilesCount dir `shouldReturn` 1 + -- liftIO $ print "3" >> getLine _mq <- isolateQueue q "test" $ getMsgQueue ms q False -- journal is removed liftIO $ journalFilesCount dir `shouldReturn` 0 + -- liftIO $ print "4" >> getLine Just (Message {msgId = mId3}, True) <- write q "message 3" -- journal is created liftIO $ journalFilesCount dir `shouldReturn` 1 + -- liftIO $ print "5" >> getLine Just (Message {msgId = mId4}, False) <- write q "message 4" (Msg "message 3", Msg "message 4") <- tryDelPeekMsg ms q mId3 (Msg "message 4", Nothing) <- tryDelPeekMsg ms q mId4 Just (Message {msgId = mId5}, True) <- write q "message 5" Just (Message {msgId = mId6}, False) <- write q "message 6" liftIO $ journalFilesCount dir `shouldReturn` 1 + -- liftIO $ print "6" >> getLine Just (Message {msgId = mId7}, False) <- write q "message 7" -- separate write journal is created liftIO $ journalFilesCount dir `shouldReturn` 2 + -- liftIO $ print "7" >> getLine Nothing <- write q "message 8" (Msg "message 5", Msg "message 6") <- tryDelPeekMsg ms q mId5 liftIO $ journalFilesCount dir `shouldReturn` 2 + -- liftIO $ print "8" >> getLine (Msg "message 6", Msg "message 7") <- tryDelPeekMsg ms q mId6 -- read journal is removed liftIO $ journalFilesCount dir `shouldReturn` 1 + -- liftIO $ print "9" >> getLine (Msg "message 7", Just MessageQuota {msgId = mId8}) <- tryDelPeekMsg ms q mId7 (Just MessageQuota {}, Nothing) <- tryDelPeekMsg ms q mId8 liftIO $ closeMsgQueue q journalFilesCount dir `shouldReturn` 1 + -- liftIO $ print "10" >> getLine runRight $ do q <- ExceptT $ getQueue ms SRecipient rId Just (Message {}, True) <- write q "message 8" liftIO $ journalFilesCount dir `shouldReturn` 1 + -- liftIO $ print "11" >> getLine liftIO $ closeMsgQueue q where journalFilesCount dir = length . filter ("messages." `isPrefixOf`) <$> listDirectory dir + stateBackupCount dir = length . filter (".bak" `isSuffixOf`) <$> listDirectory dir testRemoveQueueStateBackups :: IO () testRemoveQueueStateBackups = do