mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-04-05 08:56:14 +00:00
smp server: remove empty journals when opening message queue
This commit is contained in:
@@ -65,7 +65,7 @@ import Simplex.Messaging.Server.QueueStore.STM
|
||||
import Simplex.Messaging.TMap (TMap)
|
||||
import qualified Simplex.Messaging.TMap as TM
|
||||
import Simplex.Messaging.Server.StoreLog
|
||||
import Simplex.Messaging.Util (ifM, tshow, ($>>=), (<$$>))
|
||||
import Simplex.Messaging.Util (ifM, tshow, whenM, ($>>=), (<$$>))
|
||||
import System.Directory
|
||||
import System.Exit
|
||||
import System.FilePath ((</>))
|
||||
@@ -491,9 +491,26 @@ isolateQueueId op ms rId = tryStore op rId . withLockMap (queueLocks ms) rId op
|
||||
openMsgQueue :: JournalMsgStore -> JMQueue -> IO JournalMsgQueue
|
||||
openMsgQueue ms q@JMQueue {queueDirectory = dir, statePath} = do
|
||||
(st, sh) <- readWriteQueueState ms statePath
|
||||
(st', rh, wh_) <- closeOnException sh $ openJournals ms dir st sh
|
||||
let hs = MsgQueueHandles {stateHandle = sh, readHandle = rh, writeHandle = wh_}
|
||||
mkJournalQueue q st' (Just hs)
|
||||
let MsgQueueState {readState = rs, writeState = ws, size} = st
|
||||
if size == 0
|
||||
then E.uninterruptibleMask_ $ do
|
||||
-- If the queue is empty, state is backed up and journals are deleted
|
||||
-- They will be created again if queue is written to.
|
||||
-- canWrite is set to True.
|
||||
backupQueueState sh
|
||||
removeJournalIfExists dir rs
|
||||
unless (journalId ws == journalId rs) $ removeJournalIfExists dir ws
|
||||
st' <- newMsgQueueState <$> newJournalId (random ms)
|
||||
mkJournalQueue q st' Nothing
|
||||
else do
|
||||
(st', rh, wh_) <- closeOnException sh $ openJournals ms dir st sh
|
||||
let hs = MsgQueueHandles {stateHandle = sh, readHandle = rh, writeHandle = wh_}
|
||||
mkJournalQueue q st' (Just hs)
|
||||
where
|
||||
backupQueueState sh = do
|
||||
hClose sh
|
||||
ts <- getCurrentTime
|
||||
renameFile statePath (statePath <> "." <> iso8601Show ts <> ".bak")
|
||||
|
||||
mkJournalQueue :: JMQueue -> MsgQueueState -> Maybe MsgQueueHandles -> IO JournalMsgQueue
|
||||
mkJournalQueue queue st hs_ = do
|
||||
@@ -628,12 +645,20 @@ fixFileSize h pos = do
|
||||
| otherwise -> pure ()
|
||||
|
||||
removeJournal :: FilePath -> JournalState t -> IO ()
|
||||
removeJournal dir JournalState {journalId} = do
|
||||
removeJournal dir JournalState {journalId} = safeRemoveFile $ journalFilePath dir journalId
|
||||
|
||||
removeJournalIfExists :: FilePath -> JournalState t -> IO ()
|
||||
removeJournalIfExists dir JournalState {journalId} = do
|
||||
let path = journalFilePath dir journalId
|
||||
whenM (doesFileExist path) $ safeRemoveFile path
|
||||
|
||||
safeRemoveFile :: FilePath -> IO ()
|
||||
safeRemoveFile path =
|
||||
removeFile path `catchAny` (\e -> logError $ "STORE: removeJournal, " <> T.pack path <> ", " <> tshow e)
|
||||
|
||||
-- This function is supposed to be resilient to crashes while updating state files,
|
||||
-- and also resilient to crashes during its execution.
|
||||
-- TODO expire old state files
|
||||
readWriteQueueState :: JournalMsgStore -> FilePath -> IO (MsgQueueState, Handle)
|
||||
readWriteQueueState JournalMsgStore {random, config} statePath =
|
||||
ifM
|
||||
|
||||
Reference in New Issue
Block a user