From 09bdd067c8dae561a12be815e858dfff5686fc99 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Sun, 16 Feb 2025 11:55:21 +0000 Subject: [PATCH] smp server: remove empty journals when opening message queue --- .../Messaging/Server/MsgStore/Journal.hs | 35 ++++++++++++++++--- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index 0834ad1d9..34ba0f06d 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -65,7 +65,7 @@ import Simplex.Messaging.Server.QueueStore.STM import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Server.StoreLog -import Simplex.Messaging.Util (ifM, tshow, ($>>=), (<$$>)) +import Simplex.Messaging.Util (ifM, tshow, whenM, ($>>=), (<$$>)) import System.Directory import System.Exit import System.FilePath (()) @@ -491,9 +491,26 @@ isolateQueueId op ms rId = tryStore op rId . withLockMap (queueLocks ms) rId op openMsgQueue :: JournalMsgStore -> JMQueue -> IO JournalMsgQueue openMsgQueue ms q@JMQueue {queueDirectory = dir, statePath} = do (st, sh) <- readWriteQueueState ms statePath - (st', rh, wh_) <- closeOnException sh $ openJournals ms dir st sh - let hs = MsgQueueHandles {stateHandle = sh, readHandle = rh, writeHandle = wh_} - mkJournalQueue q st' (Just hs) + let MsgQueueState {readState = rs, writeState = ws, size} = st + if size == 0 + then E.uninterruptibleMask_ $ do + -- If the queue is empty, state is backed up and journals are deleted + -- They will be created again if queue is written to. + -- canWrite is set to True. + backupQueueState sh + removeJournalIfExists dir rs + unless (journalId ws == journalId rs) $ removeJournalIfExists dir ws + st' <- newMsgQueueState <$> newJournalId (random ms) + mkJournalQueue q st' Nothing + else do + (st', rh, wh_) <- closeOnException sh $ openJournals ms dir st sh + let hs = MsgQueueHandles {stateHandle = sh, readHandle = rh, writeHandle = wh_} + mkJournalQueue q st' (Just hs) + where + backupQueueState sh = do + hClose sh + ts <- getCurrentTime + renameFile statePath (statePath <> "." <> iso8601Show ts <> ".bak") mkJournalQueue :: JMQueue -> MsgQueueState -> Maybe MsgQueueHandles -> IO JournalMsgQueue mkJournalQueue queue st hs_ = do @@ -628,12 +645,20 @@ fixFileSize h pos = do | otherwise -> pure () removeJournal :: FilePath -> JournalState t -> IO () -removeJournal dir JournalState {journalId} = do +removeJournal dir JournalState {journalId} = safeRemoveFile $ journalFilePath dir journalId + +removeJournalIfExists :: FilePath -> JournalState t -> IO () +removeJournalIfExists dir JournalState {journalId} = do let path = journalFilePath dir journalId + whenM (doesFileExist path) $ safeRemoveFile path + +safeRemoveFile :: FilePath -> IO () +safeRemoveFile path = removeFile path `catchAny` (\e -> logError $ "STORE: removeJournal, " <> T.pack path <> ", " <> tshow e) -- This function is supposed to be resilient to crashes while updating state files, -- and also resilient to crashes during its execution. +-- TODO expire old state files readWriteQueueState :: JournalMsgStore -> FilePath -> IO (MsgQueueState, Handle) readWriteQueueState JournalMsgStore {random, config} statePath = ifM