mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-25 20:44:49 +00:00
remove statePath from queue record
This commit is contained in:
@@ -122,8 +122,7 @@ data JournalQueue (s :: MSType) = JournalQueue
|
||||
activeAt :: TVar Int64,
|
||||
-- Just True - empty, Just False - non-empty, Nothing - unknown
|
||||
isEmpty :: TVar (Maybe Bool),
|
||||
queueDirectory :: FilePath,
|
||||
statePath :: FilePath
|
||||
queueDirectory :: FilePath
|
||||
}
|
||||
|
||||
data JournalMsgQueue (s :: MSType) = JournalMsgQueue
|
||||
@@ -247,8 +246,6 @@ makeQueue st queueLock qr@QueueRec {recipientId} = do
|
||||
msgQueue_ <- newTVarIO Nothing
|
||||
activeAt <- newTVarIO 0
|
||||
isEmpty <- newTVarIO Nothing
|
||||
let dir = msgQueueDirectory st recipientId
|
||||
statePath = msgQueueStatePath dir $ B.unpack (strEncode recipientId)
|
||||
pure
|
||||
JournalQueue
|
||||
{ recipientId,
|
||||
@@ -257,10 +254,11 @@ makeQueue st queueLock qr@QueueRec {recipientId} = do
|
||||
msgQueue_,
|
||||
activeAt,
|
||||
isEmpty,
|
||||
queueDirectory = dir,
|
||||
statePath
|
||||
queueDirectory = msgQueueDirectory st recipientId
|
||||
}
|
||||
|
||||
-- statePath = msgQueueStatePath dir $ B.unpack (strEncode recipientId)
|
||||
|
||||
instance MsgStoreClass (JournalMsgStore s) where
|
||||
type StoreMonad (JournalMsgStore s) = StoreIO s
|
||||
type StoreQueue (JournalMsgStore s) = JournalQueue s
|
||||
@@ -543,7 +541,7 @@ instance MsgStoreClass (JournalMsgStore s) where
|
||||
(msg :) <$> run msgs
|
||||
|
||||
writeMsg :: JournalMsgStore s -> JournalQueue s -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool))
|
||||
writeMsg ms q'@JournalQueue {queueDirectory, statePath} logState msg = isolateQueue q' "writeMsg" $ do
|
||||
writeMsg ms q'@JournalQueue {recipientId, queueDirectory = dir} logState msg = isolateQueue q' "writeMsg" $ do
|
||||
q <- getMsgQueue ms q'
|
||||
StoreIO $ (`E.finally` updateActiveAt q') $ do
|
||||
st@MsgQueueState {canWrite, size} <- readTVarIO (state q)
|
||||
@@ -576,16 +574,17 @@ instance MsgStoreClass (JournalMsgStore s) where
|
||||
when (size == 0) $ writeTVar (tipMsg q) $ Just (Just (msg, msgLen))
|
||||
where
|
||||
createQueueDir = do
|
||||
createDirectoryIfMissing True queueDirectory
|
||||
createDirectoryIfMissing True dir
|
||||
let statePath = msgQueueStatePath dir $ B.unpack (strEncode recipientId)
|
||||
sh <- openFile statePath AppendMode
|
||||
B.hPutStr sh ""
|
||||
rh <- createNewJournal queueDirectory $ journalId rs
|
||||
rh <- createNewJournal dir $ journalId rs
|
||||
let hs = MsgQueueHandles {stateHandle = sh, readHandle = rh, writeHandle = Nothing}
|
||||
atomically $ writeTVar handles $ Just hs
|
||||
pure hs
|
||||
switchWriteJournal hs = do
|
||||
journalId <- newJournalId $ random ms
|
||||
wh <- createNewJournal queueDirectory journalId
|
||||
wh <- createNewJournal dir journalId
|
||||
atomically $ writeTVar handles $ Just $ hs {writeHandle = Just wh}
|
||||
pure (newJournalState journalId, wh)
|
||||
|
||||
@@ -669,7 +668,8 @@ storeQueue_ JournalQueue {recipientId, queueDirectory} _q = pure () -- save queu
|
||||
_queuePath = queueRecPath queueDirectory $ B.unpack (strEncode recipientId)
|
||||
|
||||
openMsgQueue :: JournalMsgStore s -> JournalQueue s -> IO (JournalMsgQueue s)
|
||||
openMsgQueue ms JournalQueue {queueDirectory = dir, statePath} = do
|
||||
openMsgQueue ms JournalQueue {recipientId, queueDirectory = dir} = do
|
||||
let statePath = msgQueueStatePath dir $ B.unpack (strEncode recipientId)
|
||||
(st, sh) <- readWriteQueueState ms statePath
|
||||
(st', rh, wh_) <- closeOnException sh $ openJournals ms dir st sh
|
||||
let hs = MsgQueueHandles {stateHandle = sh, readHandle = rh, writeHandle = wh_}
|
||||
|
||||
Reference in New Issue
Block a user