refactor openMsgQueue to prevent extra state backups

This commit is contained in:
Evgeny Poberezkin
2025-02-17 18:01:01 +00:00
parent d10eb0e56e
commit e2589bc7d4
2 changed files with 93 additions and 83 deletions
@@ -28,7 +28,7 @@ module Simplex.Messaging.Server.MsgStore.Journal
SJournalType (..),
msgQueueDirectory,
msgQueueStatePath,
readWriteQueueState,
readQueueState,
newMsgQueueState,
newJournalId,
appendState,
@@ -429,7 +429,6 @@ instance MsgStoreClass JournalMsgStore where
createQueueDir = do
createDirectoryIfMissing True queueDirectory
sh <- openFile statePath AppendMode
B.hPutStr sh ""
rh <- createNewJournal queueDirectory $ journalId rs
let hs = MsgQueueHandles {stateHandle = sh, readHandle = rh, writeHandle = Nothing}
atomically $ writeTVar handles $ Just hs
@@ -493,40 +492,69 @@ isolateQueueId :: String -> JournalMsgStore -> RecipientId -> IO (Either ErrorTy
isolateQueueId op ms rId = tryStore op rId . withLockMap (queueLocks ms) rId op
openMsgQueue :: JournalMsgStore -> JMQueue -> Bool -> IO JournalMsgQueue
openMsgQueue ms q@JMQueue {queueDirectory = dir, statePath} forWrite = do
(st, sh) <- readWriteQueueState ms statePath
removeBackups ms q
if size st == 0
then do
(st', hs_) <- removeJournals st sh
mkJournalQueue q st' hs_
else do
(st', rh, wh_) <- closeOnException sh $ openJournals ms dir st sh
let hs = MsgQueueHandles {stateHandle = sh, readHandle = rh, writeHandle = wh_}
mkJournalQueue q st' (Just hs)
openMsgQueue ms@JournalMsgStore {config} q@JMQueue {queueDirectory = dir, statePath} forWrite = do
(st_, shouldBackup) <- readQueueState ms statePath
case st_ of
Nothing -> do
st <- newMsgQueueState <$> newJournalId (random ms)
when shouldBackup $ backupQueueState statePath
mkJournalQueue q st Nothing
Just st
| size st == 0 -> do
(st', hs_) <- removeJournals st shouldBackup
mkJournalQueue q st' hs_
| otherwise -> do
sh <- openBackupQueueState st shouldBackup
(st', rh, wh_) <- closeOnException sh $ openJournals ms dir st sh
let hs = MsgQueueHandles {stateHandle = sh, readHandle = rh, writeHandle = wh_}
mkJournalQueue q st' (Just hs)
where
-- If the queue is empty, journals are deleted.
-- New journal is created if queue is written to.
-- canWrite is set to True.
removeJournals MsgQueueState {readState = rs, writeState = ws} sh = E.uninterruptibleMask_ $ do
removeJournals MsgQueueState {readState = rs, writeState = ws} shouldBackup = E.uninterruptibleMask_ $ do
rjId <- newJournalId $ random ms
let st' = newMsgQueueState rjId
let st = newMsgQueueState rjId
hs_ <-
if forWrite
then Just <$> newJournalHandles st' rjId
else Nothing <$ backupQueueState
then Just <$> newJournalHandles st rjId
else Nothing <$ backupQueueState statePath
removeJournalIfExists dir rs
unless (journalId ws == journalId rs) $ removeJournalIfExists dir ws
pure (st', hs_)
pure (st, hs_)
where
newJournalHandles st' rjId = do
appendState_ sh st'
newJournalHandles st rjId = do
sh <- openBackupQueueState st shouldBackup
appendState_ sh st
rh <- closeOnException sh $ createNewJournal dir rjId
pure MsgQueueHandles {stateHandle = sh, readHandle = rh, writeHandle = Nothing}
backupQueueState = do
hClose sh
ts <- getCurrentTime
renameFile statePath $ stateBackupPath statePath ts
openBackupQueueState st shouldBackup
| shouldBackup = do
-- State backup is made in two steps to mitigate the crash during the backup.
-- Temporary backup file will be used when it is present.
let tempBackup = statePath <> ".bak"
renameFile statePath tempBackup -- 1) temp backup
sh <- writeQueueState st -- 2) save state
backupQueueState tempBackup -- 3) timed backup
pure sh
| otherwise = openFile statePath AppendMode
backupQueueState path = do
ts <- getCurrentTime
renameFile path $ stateBackupPath statePath ts
removeBackups
writeQueueState st = do
sh <- openFile statePath AppendMode
closeOnException sh $ appendState sh st
pure sh
removeBackups = do
times <- sort . mapMaybe backupPathTime <$> listDirectory dir
let toDelete = filter (< expireBefore) $ take (length times - keepMinBackups) times
mapM_ (safeRemoveFile "removeBackups" . stateBackupPath statePath) toDelete
where
JournalStoreConfig {expireBefore, keepMinBackups} = config
backupPathTime :: FilePath -> Maybe UTCTime
backupPathTime = iso8601ParseM . T.unpack <=< T.stripSuffix ".bak" <=< T.stripPrefix statePathPfx . T.pack
statePathPfx = T.pack $ takeFileName statePath <> "."
mkJournalQueue :: JMQueue -> MsgQueueState -> Maybe MsgQueueHandles -> IO JournalMsgQueue
mkJournalQueue queue st hs_ = do
@@ -683,56 +711,39 @@ handleError cxt path a =
-- This function is supposed to be resilient to crashes while updating state files,
-- and also resilient to crashes during its execution.
readWriteQueueState :: JournalMsgStore -> FilePath -> IO (MsgQueueState, Handle)
readWriteQueueState JournalMsgStore {random, config} statePath =
readQueueState :: JournalMsgStore -> FilePath -> IO (Maybe MsgQueueState, Bool)
readQueueState JournalMsgStore {config} statePath =
ifM
(doesFileExist tempBackup)
(renameFile tempBackup statePath >> readQueueState)
(ifM (doesFileExist statePath) readQueueState writeNewQueueState)
(renameFile tempBackup statePath >> readState)
(ifM (doesFileExist statePath) readState $ pure (Nothing, False))
where
tempBackup = statePath <> ".bak"
readQueueState = do
readState = do
ls <- B.lines <$> readFileTail
case ls of
[] -> writeNewQueueState
[] -> do
logWarn $ "STORE: readWriteQueueState, empty queue state, " <> T.pack statePath
pure (Nothing, False)
_ -> do
r@(st, _) <- useLastLine (length ls) True ls
unless (validQueueState st) $ E.throwIO $ userError $ "readWriteQueueState inconsistent state: " <> show st
r <- useLastLine (length ls) True ls
forM_ (fst r) $ \st ->
unless (validQueueState st) $ E.throwIO $ userError $ "readWriteQueueState inconsistent state: " <> show st
pure r
writeNewQueueState = do
logWarn $ "STORE: readWriteQueueState, empty queue state - initialized, " <> T.pack statePath
st <- newMsgQueueState <$> newJournalId random
writeQueueState st
useLastLine len isLastLine ls = case strDecode $ last ls of
Right st
| len > maxStateLines config || not isLastLine ->
backupWriteQueueState st
| otherwise -> do
-- when state file has fewer than maxStateLines, we don't compact it
sh <- openFile statePath AppendMode
pure (st, sh)
Right st ->
-- when state file has fewer than maxStateLines, we don't compact it
let shouldBackup = len > maxStateLines config || not isLastLine
in pure (Just st, shouldBackup)
Left e -- if the last line failed to parse
| isLastLine -> case init ls of -- or use the previous line
[] -> do
logWarn $ "STORE: readWriteQueueState, invalid 1-line queue state - initialized, " <> T.pack statePath
st <- newMsgQueueState <$> newJournalId random
backupWriteQueueState st
pure (Nothing, True) -- backup state file, because last line was invalid
ls' -> do
logWarn $ "STORE: readWriteQueueState, invalid last line in queue state - using the previous line, " <> T.pack statePath
useLastLine len False ls'
| otherwise -> E.throwIO $ userError $ "readWriteQueueState invalid state " <> statePath <> ": " <> show e
backupWriteQueueState st = do
-- State backup is made in two steps to mitigate the crash during the backup.
-- Temporary backup file will be used when it is present.
renameFile statePath tempBackup -- 1) temp backup
r <- writeQueueState st -- 2) save state
ts <- getCurrentTime
renameFile tempBackup $ stateBackupPath statePath ts -- 3) timed backup
pure r
writeQueueState st = do
sh <- openFile statePath AppendMode
closeOnException sh $ appendState sh st
pure (st, sh)
readFileTail =
IO.withFile statePath ReadMode $ \h -> do
size <- IO.hFileSize h
@@ -742,17 +753,6 @@ readWriteQueueState JournalMsgStore {random, config} statePath =
then IO.hSeek h AbsoluteSeek (size - sz') >> B.hGet h sz
else B.hGet h (fromIntegral size)
removeBackups :: JournalMsgStore -> JMQueue -> IO ()
removeBackups ms JMQueue {queueDirectory = dir, statePath} = do
times <- sort . mapMaybe backupPathTime <$> listDirectory dir
let toDelete = filter (< expireBefore) $ take (length times - keepMinBackups) times
mapM_ (safeRemoveFile "removeBackups" . stateBackupPath statePath) toDelete
where
JournalMsgStore {config = JournalStoreConfig {expireBefore, keepMinBackups}} = ms
backupPathTime :: FilePath -> Maybe UTCTime
backupPathTime = iso8601ParseM . T.unpack <=< T.stripSuffix ".bak" <=< T.stripPrefix statePathPfx . T.pack
statePathPfx = T.pack $ takeFileName statePath <> "."
stateBackupPath :: FilePath -> UTCTime -> FilePath
stateBackupPath statePath ts = statePath <> "." <> iso8601Show ts <> ".bak"
+25 -15
View File
@@ -44,7 +44,7 @@ import Simplex.Messaging.Server.StoreLog (closeStoreLog, logCreateQueue)
import SMPClient (testStoreLogFile, testStoreMsgsDir, testStoreMsgsDir2, testStoreMsgsFile, testStoreMsgsFile2)
import System.Directory (copyFile, createDirectoryIfMissing, listDirectory, removeFile, renameFile)
import System.FilePath ((</>))
import System.IO (IOMode (..), hClose, withFile)
import System.IO (IOMode (..), withFile)
import Test.Hspec
msgStoreTests :: Spec
@@ -244,7 +244,7 @@ testQueueState ms = do
state <- newMsgQueueState <$> newJournalId (random ms)
withFile statePath WriteMode (`appendState` state)
length . lines <$> readFile statePath `shouldReturn` 1
readQueueState statePath `shouldReturn` state
readQueueState ms statePath `shouldReturn` (Just state, False)
length <$> listDirectory dir `shouldReturn` 1 -- no backup
let state1 =
@@ -255,7 +255,7 @@ testQueueState ms = do
}
withFile statePath AppendMode (`appendState` state1)
length . lines <$> readFile statePath `shouldReturn` 2
readQueueState statePath `shouldReturn` state1
readQueueState ms statePath `shouldReturn` (Just state1, False)
length <$> listDirectory dir `shouldReturn` 1 -- no backup
let state2 =
@@ -267,28 +267,26 @@ testQueueState ms = do
withFile statePath AppendMode (`appendState` state2)
length . lines <$> readFile statePath `shouldReturn` 3
copyFile statePath (statePath <> ".2")
readQueueState statePath `shouldReturn` state2
length <$> listDirectory dir `shouldReturn` 3 -- new state, copy + backup
length . lines <$> readFile statePath `shouldReturn` 1
readQueueState ms statePath `shouldReturn` (Just state2, True)
length <$> listDirectory dir `shouldReturn` 2 -- new state + copy
ls <- lines <$> readFile statePath
length ls `shouldBe` 3
-- mock compacting file
writeFile statePath $ last ls
-- corrupt the only line
corruptFile statePath
newState <- readQueueState statePath
newState `shouldBe` newMsgQueueState (journalId $ writeState newState)
(Nothing, True) <- readQueueState ms statePath
-- corrupt the last line
renameFile (statePath <> ".2") statePath
removeOtherFiles dir statePath
length . lines <$> readFile statePath `shouldReturn` 3
corruptFile statePath
readQueueState statePath `shouldReturn` state1
length <$> listDirectory dir `shouldReturn` 2
length . lines <$> readFile statePath `shouldReturn` 1
readQueueState ms statePath `shouldReturn` (Just state1, False)
length <$> listDirectory dir `shouldReturn` 1
length . lines <$> readFile statePath `shouldReturn` 3
where
readQueueState statePath = do
(state, h) <- readWriteQueueState ms statePath
hClose h
pure state
corruptFile f = do
s <- readFile f
removeFile f
@@ -343,47 +341,59 @@ testRemoveJournals ms = do
ls <- B.lines <$> B.readFile statePath
length ls `shouldBe` 4
journalFilesCount dir `shouldReturn` 1
-- print "1" >> getLine
runRight $ do
q <- ExceptT $ getQueue ms SRecipient rId
-- not removed yet
liftIO $ journalFilesCount dir `shouldReturn` 1
-- liftIO $ print "2" >> getLine
Nothing <- tryPeekMsg ms q
-- still not removed, queue is empty and not opened
liftIO $ journalFilesCount dir `shouldReturn` 1
-- liftIO $ print "3" >> getLine
_mq <- isolateQueue q "test" $ getMsgQueue ms q False
-- journal is removed
liftIO $ journalFilesCount dir `shouldReturn` 0
-- liftIO $ print "4" >> getLine
Just (Message {msgId = mId3}, True) <- write q "message 3"
-- journal is created
liftIO $ journalFilesCount dir `shouldReturn` 1
-- liftIO $ print "5" >> getLine
Just (Message {msgId = mId4}, False) <- write q "message 4"
(Msg "message 3", Msg "message 4") <- tryDelPeekMsg ms q mId3
(Msg "message 4", Nothing) <- tryDelPeekMsg ms q mId4
Just (Message {msgId = mId5}, True) <- write q "message 5"
Just (Message {msgId = mId6}, False) <- write q "message 6"
liftIO $ journalFilesCount dir `shouldReturn` 1
-- liftIO $ print "6" >> getLine
Just (Message {msgId = mId7}, False) <- write q "message 7"
-- separate write journal is created
liftIO $ journalFilesCount dir `shouldReturn` 2
-- liftIO $ print "7" >> getLine
Nothing <- write q "message 8"
(Msg "message 5", Msg "message 6") <- tryDelPeekMsg ms q mId5
liftIO $ journalFilesCount dir `shouldReturn` 2
-- liftIO $ print "8" >> getLine
(Msg "message 6", Msg "message 7") <- tryDelPeekMsg ms q mId6
-- read journal is removed
liftIO $ journalFilesCount dir `shouldReturn` 1
-- liftIO $ print "9" >> getLine
(Msg "message 7", Just MessageQuota {msgId = mId8}) <- tryDelPeekMsg ms q mId7
(Just MessageQuota {}, Nothing) <- tryDelPeekMsg ms q mId8
liftIO $ closeMsgQueue q
journalFilesCount dir `shouldReturn` 1
-- liftIO $ print "10" >> getLine
runRight $ do
q <- ExceptT $ getQueue ms SRecipient rId
Just (Message {}, True) <- write q "message 8"
liftIO $ journalFilesCount dir `shouldReturn` 1
-- liftIO $ print "11" >> getLine
liftIO $ closeMsgQueue q
where
journalFilesCount dir = length . filter ("messages." `isPrefixOf`) <$> listDirectory dir
stateBackupCount dir = length . filter (".bak" `isSuffixOf`) <$> listDirectory dir
testRemoveQueueStateBackups :: IO ()
testRemoveQueueStateBackups = do