From c8cc2f262b5df85fafaed05a8c8fd103e5ef822e Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Sun, 15 Dec 2024 21:59:34 +0000 Subject: [PATCH] journal store works, some tests pass --- rfcs/2024-11-25-queue-blobs-2.md | 6 +- src/Simplex/Messaging/Server.hs | 6 +- .../Messaging/Server/MsgStore/Journal.hs | 231 +++++++++++------- src/Simplex/Messaging/Server/MsgStore/STM.hs | 6 +- .../Messaging/Server/MsgStore/Types.hs | 10 +- .../Messaging/Server/QueueStore/STM.hs | 6 + tests/CoreTests/MsgStoreTests.hs | 5 +- tests/Test.hs | 4 +- 8 files changed, 166 insertions(+), 108 deletions(-) diff --git a/rfcs/2024-11-25-queue-blobs-2.md b/rfcs/2024-11-25-queue-blobs-2.md index a4913c118..143687da0 100644 --- a/rfcs/2024-11-25-queue-blobs-2.md +++ b/rfcs/2024-11-25-queue-blobs-2.md @@ -43,9 +43,9 @@ Additional suggestion to reduce probability of queue_state.log and queue_rec.log - check the last byte of the file and log EOL if it is not EOL. Probably cleanest approach, but with a small performance cost. If queue folder is a reference to the queue, it may have one of these files: -- notifier.id -- sender.id -- link.id +- notifier.ref +- sender.ref +- link.ref These files would contain a one line with the recipient ID of the queue. These files would never change, they can only be deleted when queue is deleted or when notifier/link is deleted. diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 544f769b9..a2759ba8f 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -1786,8 +1786,8 @@ importMessages tty ms f old_ = do renameFile f $ f <> ".bak" mapM_ setOverQuota_ overQuota logQueueStates ms - storedQueues <- M.size <$> readTVarIO (activeMsgQueues ms) - pure MessageStats {storedMsgsCount, expiredMsgsCount, storedQueues} + QueueCounts {queueCount} <- queueCounts ms + pure MessageStats {storedMsgsCount, expiredMsgsCount, storedQueues = queueCount} where progress i = "Processed " <> show i <> " lines" restoreMsg :: (Int, Maybe (RecipientId, StoreQueue s), (Int, Int, M.Map RecipientId (StoreQueue s))) -> LB.ByteString -> ExceptT String IO (Int, Maybe (RecipientId, StoreQueue s), (Int, Int, M.Map RecipientId (StoreQueue s))) @@ -1895,7 +1895,7 @@ restoreServerStats msgStats_ ntfStats = asks (serverStatsBackupFile . config) >> Right d@ServerStatsData {_qCount = statsQCount, _msgCount = statsMsgCount, _ntfCount = statsNtfCount} -> do s <- asks serverStats AMS _ st <- asks msgStore - _qCount <- M.size <$> readTVarIO (activeMsgQueues st) + QueueCounts {queueCount = _qCount} <- liftIO $ queueCounts st let _msgCount = maybe statsMsgCount storedMsgsCount msgStats_ _ntfCount = storedMsgsCount ntfStats _msgExpired' = _msgExpired d + maybe 0 expiredMsgsCount msgStats_ diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index 2074ef493..d9faaf38e 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -46,6 +46,7 @@ import Control.Logger.Simple import Control.Monad import Control.Monad.Trans.Except import qualified Data.Attoparsec.ByteString.Char8 as A +import Data.Bifunctor (first) import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Data.Functor (($>)) @@ -68,7 +69,7 @@ import Simplex.Messaging.Server.QueueStore.STM import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Server.StoreLog -import Simplex.Messaging.Util (anyM, ifM, tshow, ($>>=), (<$$>)) +import Simplex.Messaging.Util (anyM, ifM, tshow, whenM, ($>>=), (<$$>)) import System.Directory import System.Exit import System.FilePath (()) @@ -225,6 +226,16 @@ msgLogFileName = "messages" logFileExt :: String logFileExt = ".log" +data QueueRef = QRSender | QRNotifier + +queueRefFileName :: QueueRef -> String +queueRefFileName = \case + QRSender -> "sender" + QRNotifier -> "notifier" + +queueRefFileExt :: String +queueRefFileExt = ".ref" + newtype StoreIO (s :: MSType) a = StoreIO {unStoreIO :: IO a} deriving newtype (Functor, Applicative, Monad) @@ -232,10 +243,11 @@ instance STMStoreClass (JournalMsgStore 'MSHybrid) where stmQueueStore JournalMsgStore {queueStore = MQStore st} = st mkQueue st rId qr = do lock <- atomically $ getMapLock (queueLocks st) rId - makeQueue st lock rId qr + let dir = msgQueueDirectory st rId + makeQueue dir lock rId qr -makeQueue :: JournalMsgStore s -> Lock -> RecipientId -> QueueRec -> IO (JournalQueue s) -makeQueue st queueLock rId qr = do +makeQueue :: FilePath -> Lock -> RecipientId -> QueueRec -> IO (JournalQueue s) +makeQueue queueDirectory queueLock rId qr = do queueRec <- newTVarIO $ Just qr msgQueue_ <- newTVarIO Nothing activeAt <- newTVarIO 0 @@ -248,7 +260,7 @@ makeQueue st queueLock rId qr = do msgQueue_, activeAt, isEmpty, - queueDirectory = msgQueueDirectory st rId + queueDirectory } instance JournalStoreType s => MsgStoreClass (JournalMsgStore s) where @@ -276,12 +288,15 @@ instance JournalStoreType s => MsgStoreClass (JournalMsgStore s) where MQStore st -> do readTVarIO (storeLog st) >>= mapM_ closeStoreLog readTVarIO (queues st) >>= mapM_ closeMsgQueue - JQStore {queues_} -> - readTVarIO queues_ >>= mapM_ (mapM closeMsgQueue) + st@JQStore {} -> + readTVarIO (queues_ st) >>= mapM_ (mapM closeMsgQueue) - activeMsgQueues ms = case queueStore ms of - MQStore st -> queues st - JQStore {} -> undefined -- TODO [queues] + withActiveMsgQueues :: Monoid a => JournalMsgStore s -> (JournalQueue s -> IO a) -> IO a + withActiveMsgQueues ms f = case queueStore ms of + MQStore st -> withQueues st f + st@JQStore {} -> readTVarIO (queues_ st) >>= foldM run mempty + where + run !acc = maybe (pure acc) (fmap (acc <>) . f) -- This function is a "foldr" that opens and closes all queues, processes them as defined by action and accumulates the result. -- It is used to export storage to a single file and also to expire messages and validate all queues when server is started. @@ -352,25 +367,25 @@ instance JournalStoreType s => MsgStoreClass (JournalMsgStore s) where queueCount <- M.size <$> readTVarIO (queues st) notifierCount <- M.size <$> readTVarIO (notifiers st) pure QueueCounts {queueCount, notifierCount} - JQStore {queues_, notifiers_} -> do - queueCount <- M.size <$> readTVarIO queues_ - notifierCount <- M.size <$> readTVarIO notifiers_ + st@JQStore {} -> do + queueCount <- M.size <$> readTVarIO (queues_ st) + notifierCount <- M.size <$> readTVarIO (notifiers_ st) pure QueueCounts {queueCount, notifierCount} addQueue :: JournalMsgStore s -> RecipientId -> QueueRec -> IO (Either ErrorType (JournalQueue s)) - addQueue st@JournalMsgStore {queueLocks = ls} rId qr@QueueRec {senderId = sId, notifier} = case queueStore st of - MQStore {} -> addQueue' st rId qr + addQueue ms@JournalMsgStore {queueLocks = ls} rId qr@QueueRec {senderId = sId, notifier} = case queueStore ms of + MQStore {} -> addQueue' ms rId qr JQStore {queues_, senders_, notifiers_} -> do lock <- atomically $ getMapLock ls rId tryStore "addQueue" rId $ withLock' lock "addQueue" $ withLockMap ls sId "addQueueS" $ withNotifierLock $ ifM hasAnyId (pure $ Left DUPLICATE_) $ E.uninterruptibleMask_ $ do - q <- makeQueue st lock rId qr - -- TODO [queues] maybe rename to createQueueDir - storeQueue_ q qr + let dir = msgQueueDirectory ms rId + q <- makeQueue dir lock rId qr + storeNewQueue q qr atomically $ TM.insert rId (Just q) queues_ - saveQueueRef st sId rId senders_ - forM_ notifier $ \NtfCreds {notifierId} -> saveQueueRef st notifierId rId notifiers_ + saveQueueRef ms QRSender sId rId senders_ + forM_ notifier $ \NtfCreds {notifierId} -> saveQueueRef ms QRNotifier notifierId rId notifiers_ pure $ Right q where hasAnyId = anyM [hasId rId queues_, hasId sId senders_, withNotifier (`hasId` notifiers_), hasDir rId, hasDir sId, withNotifier hasDir] @@ -378,23 +393,50 @@ instance JournalStoreType s => MsgStoreClass (JournalMsgStore s) where withNotifierLock a = maybe a (\NtfCreds {notifierId} -> withLockMap ls notifierId "addQueueN" a) notifier hasId :: EntityId -> TMap EntityId (Maybe a) -> IO Bool hasId qId m = maybe False isJust <$> atomically (TM.lookup qId m) - hasDir qId = doesDirectoryExist $ msgQueueDirectory st qId + hasDir qId = doesDirectoryExist $ msgQueueDirectory ms qId getQueue :: DirectParty p => JournalMsgStore s -> SParty p -> QueueId -> IO (Either ErrorType (JournalQueue s)) - getQueue st party qId = case queueStore st of - MQStore st' -> getQueue' st' party qId - JQStore {queues_, senders_, notifiers_} -> - isolateQueueId "getQueue" st qId $ - maybe (Left AUTH) Right <$> case party of + getQueue ms party qId = case queueStore ms of + MQStore st -> getQueue' st party qId + st@JQStore {queues_} -> + isolateQueueId "getQueue" ms qId $ + case party of SRecipient -> getQueue_ qId - SSender -> getQueueRef senders_ $>>= getQueue_ - SNotifier -> getQueueRef notifiers_ $>>= getQueue_ + SSender -> getQueueRef QRSender (senders_ st) $>>= isolateGetQueue + SNotifier -> getQueueRef QRNotifier (notifiers_ st) $>>= isolateGetQueue where - getQueue_ rId = TM.lookupIO rId queues_ >>= maybe (loadQueue rId) pure - getQueueRef :: TMap EntityId (Maybe RecipientId) -> IO (Maybe RecipientId) - getQueueRef m = TM.lookupIO qId m >>= maybe (loadQueueRef m) pure - loadQueue _rId = undefined -- TODO [queues] load, cache, return queue - loadQueueRef _m = undefined -- TODO [queues] load, cache, return queue ID + isolateGetQueue rId = isolateQueueId "getQueueR" ms rId $ getQueue_ rId + getQueue_ rId = TM.lookupIO rId queues_ >>= maybe loadQueue (pure . maybe (Left AUTH) Right) + where + loadQueue = do + let dir = msgQueueDirectory ms rId + f = queueRecPath dir rId + ifM (doesFileExist f) (load dir f) $ do + atomically $ TM.insert rId Nothing queues_ + pure $ Left AUTH + load dir f = do + -- TODO [queues] read backup if exists, remove old timed backups + qr_ <- first STORE . strDecode <$> B.readFile f + forM qr_ $ \qr -> do + lock <- atomically $ getMapLock (queueLocks ms) rId + q <- makeQueue dir lock rId qr + atomically $ TM.insert rId (Just q) queues_ + pure q + getQueueRef :: QueueRef -> TMap EntityId (Maybe RecipientId) -> IO (Either ErrorType RecipientId) + getQueueRef qRef m = TM.lookupIO qId m >>= maybe loadQueueRef (pure . maybe (Left AUTH) Right) + where + loadQueueRef = do + let dir = msgQueueDirectory ms qId + f = queueRefPath dir qRef qId + ifM (doesFileExist f) (loadRef f) $ do + atomically $ TM.insert qId Nothing m + pure $ Left AUTH + loadRef f = do + -- TODO [queues] read backup if exists, remove old timed backups + rId_ <- first STORE . strDecode <$> B.readFile f + forM rId_ $ \rId -> do + atomically $ TM.insert qId (Just rId) m + pure rId secureQueue :: JournalMsgStore s -> JournalQueue s -> SndPublicAuthKey -> IO (Either ErrorType ()) secureQueue st sq sKey = case queueStore st of @@ -410,12 +452,12 @@ instance JournalStoreType s => MsgStoreClass (JournalMsgStore s) where JQStore {notifiers_} -> isolateQueueRec sq "addQueueNotifier" $ \q -> withLockMap (queueLocks st) nId "addQueueNotifierN" $ - ifM hasNotifierId (pure $ Left DUPLICATE_) $ do + ifM hasNotifierId (pure $ Left DUPLICATE_) $ E.uninterruptibleMask_ $ do nId_ <- forM (notifier q) $ \NtfCreds {notifierId = nId'} -> withLockMap (queueLocks st) nId' "addQueueNotifierD" $ - deleteQueueRef st nId' notifiers_ $> nId' + deleteQueueRef st QRNotifier nId' notifiers_ $> nId' storeQueue sq q {notifier = Just ntfCreds} - saveQueueRef st nId (recipientId sq) notifiers_ + saveQueueRef st QRNotifier nId (recipientId sq) notifiers_ pure $ Right nId_ where hasNotifierId = anyM [hasId, hasDir] @@ -423,26 +465,26 @@ instance JournalStoreType s => MsgStoreClass (JournalMsgStore s) where hasDir = doesDirectoryExist $ msgQueueDirectory st nId deleteQueueNotifier :: JournalMsgStore s -> JournalQueue s -> IO (Either ErrorType (Maybe NotifierId)) - deleteQueueNotifier st sq = case queueStore st of - MQStore st' -> deleteQueueNotifier' st' sq - JQStore {notifiers_} -> + deleteQueueNotifier ms sq = case queueStore ms of + MQStore st -> deleteQueueNotifier' st sq + st@JQStore {} -> isolateQueueRec sq "deleteQueueNotifier" $ \q -> fmap Right $ forM (notifier q) $ \NtfCreds {notifierId = nId} -> - withLockMap (queueLocks st) nId "deleteQueueNotifierN" $ do - deleteQueueRef st nId notifiers_ + withLockMap (queueLocks ms) nId "deleteQueueNotifierN" $ do + deleteQueueRef ms QRNotifier nId (notifiers_ st) storeQueue sq q {notifier = Nothing} pure nId suspendQueue :: JournalMsgStore s -> JournalQueue s -> IO (Either ErrorType ()) - suspendQueue st sq = case queueStore st of - MQStore st' -> suspendQueue' st' sq + suspendQueue ms sq = case queueStore ms of + MQStore st -> suspendQueue' st sq JQStore {} -> isolateQueueRec sq "suspendQueue" $ \q -> fmap Right $ storeQueue sq q {status = QueueOff} updateQueueTime :: JournalMsgStore s -> JournalQueue s -> RoundedSystemTime -> IO (Either ErrorType QueueRec) - updateQueueTime st sq t = case queueStore st of - MQStore st' -> updateQueueTime' st' sq t + updateQueueTime ms sq t = case queueStore ms of + MQStore st -> updateQueueTime' st sq t JQStore {} -> isolateQueueRec sq "updateQueueTime" $ fmap Right . update where update q@QueueRec {updatedAt} @@ -452,13 +494,12 @@ instance JournalStoreType s => MsgStoreClass (JournalMsgStore s) where in storeQueue sq q' $> q' getMsgQueue :: JournalMsgStore s -> JournalQueue s -> StoreIO s (JournalMsgQueue s) - getMsgQueue ms@JournalMsgStore {random} sq@JournalQueue {msgQueue_, queueDirectory} = + getMsgQueue ms@JournalMsgStore {random} sq@JournalQueue {recipientId, msgQueue_, queueDirectory} = StoreIO $ readTVarIO msgQueue_ >>= maybe newQ pure where newQ = do - -- TODO [queues] this should account for the possibility that the folder exists, - -- but queue messaging files do not, which will always be the case when queue record is in journals - q <- ifM (doesDirectoryExist queueDirectory) (openMsgQueue ms sq) createQ + let statePath = msgQueueStatePath queueDirectory recipientId + q <- ifM (doesFileExist statePath) (openMsgQueue ms sq statePath) createQ atomically $ writeTVar msgQueue_ $ Just q pure q where @@ -551,6 +592,7 @@ instance JournalStoreType s => MsgStoreClass (JournalMsgStore s) where writeToJournal q@JournalMsgQueue {handles} st@MsgQueueState {writeState, readState = rs, size} canWrt' !msg' = do let msgStr = strEncode msg' `B.snoc` '\n' msgLen = fromIntegral $ B.length msgStr + -- TODO [queues] this should just work, if queue was not opened it will be created, and folder will be used if exists hs <- maybe createQueueDir pure =<< readTVarIO handles (ws, wh) <- case writeHandle hs of Nothing | msgCount writeState >= maxMsgCount -> switchWriteJournal hs @@ -566,7 +608,7 @@ instance JournalStoreType s => MsgStoreClass (JournalMsgStore s) where where createQueueDir = do createDirectoryIfMissing True dir - let statePath = msgQueueStatePath dir $ B.unpack (strEncode recipientId) + let statePath = msgQueueStatePath dir recipientId sh <- openFile statePath AppendMode B.hPutStr sh "" rh <- createNewJournal dir $ journalId rs @@ -637,36 +679,52 @@ isolateQueue_ JournalQueue {recipientId, queueLock} op = tryStore op recipientId isolateQueueId :: String -> JournalMsgStore s -> RecipientId -> IO (Either ErrorType a) -> IO (Either ErrorType a) isolateQueueId op ms rId = tryStore op rId . withLockMap (queueLocks ms) rId op +storeNewQueue :: JournalQueue s -> QueueRec -> IO () +storeNewQueue sq@JournalQueue {queueDirectory} q = do + createDirectoryIfMissing True queueDirectory + storeQueue_ sq q + storeQueue :: JournalQueue s -> QueueRec -> IO () storeQueue sq@JournalQueue {queueRec} q = do storeQueue_ sq q atomically $ writeTVar queueRec $ Just q --- TODO [queues] -deleteQueueDir :: JournalQueue s -> IO () -deleteQueueDir _sq = pure () - --- TODO [queues] -saveQueueRef :: JournalMsgStore s -> QueueId -> RecipientId -> TMap QueueId (Maybe RecipientId) -> IO () -saveQueueRef _st qId rId m = do - pure () -- save ref to disk +saveQueueRef :: JournalMsgStore 'MSJournal -> QueueRef -> QueueId -> RecipientId -> TMap QueueId (Maybe RecipientId) -> IO () +saveQueueRef st qRef qId rId m = do + let dir = msgQueueDirectory st qId + f = queueRefPath dir qRef qId + createDirectoryIfMissing True dir + safeReplaceFile f $ strEncode rId atomically $ TM.insert qId (Just rId) m --- TODO [queues] -deleteQueueRef :: JournalMsgStore s -> QueueId -> TMap QueueId (Maybe RecipientId) -> IO () -deleteQueueRef _st qId m = do - pure () -- delete ref from disk +deleteQueueRef :: JournalMsgStore 'MSJournal -> QueueRef -> QueueId -> TMap QueueId (Maybe RecipientId) -> IO () +deleteQueueRef st qRef qId m = do + let dir = msgQueueDirectory st qId + f = queueRefPath dir qRef qId + whenM (doesFileExist f) $ removeFile f atomically $ TM.delete qId m --- TODO [queues] storeQueue_ :: JournalQueue s -> QueueRec -> IO () -storeQueue_ JournalQueue {recipientId, queueDirectory} _q = pure () -- save queue to disk - where - _queuePath = queueRecPath queueDirectory $ B.unpack (strEncode recipientId) +storeQueue_ JournalQueue {recipientId, queueDirectory} q = do + let f = queueRecPath queueDirectory recipientId + safeReplaceFile f $ strEncode q -openMsgQueue :: JournalMsgStore s -> JournalQueue s -> IO (JournalMsgQueue s) -openMsgQueue ms JournalQueue {recipientId, queueDirectory = dir} = do - let statePath = msgQueueStatePath dir $ B.unpack (strEncode recipientId) +safeReplaceFile :: FilePath -> ByteString -> IO () +safeReplaceFile f s = ifM (doesFileExist f) replace (B.writeFile f s) + where + tempBackup = f <> ".bak" + replace = do + renameFile f tempBackup + B.writeFile f s + renameFile tempBackup =<< timedBackupName f + +timedBackupName :: FilePath -> IO FilePath +timedBackupName f = do + ts <- getCurrentTime + pure $ f <> "." <> iso8601Show ts <> ".bak" + +openMsgQueue :: JournalMsgStore s -> JournalQueue s -> FilePath -> IO (JournalMsgQueue s) +openMsgQueue ms JournalQueue {queueDirectory = dir} statePath = do (st, sh) <- readWriteQueueState ms statePath (st', rh, wh_) <- closeOnException sh $ openJournals ms dir st sh let hs = MsgQueueHandles {stateHandle = sh, readHandle = rh, writeHandle = wh_} @@ -714,8 +772,8 @@ updateReadPos q log' len hs = do updateQueueState q log' hs st' $ writeTVar (tipMsg q) Nothing msgQueueDirectory :: JournalMsgStore s -> RecipientId -> FilePath -msgQueueDirectory JournalMsgStore {config = JournalStoreConfig {storePath, pathParts}} rId = - storePath B.unpack (B.intercalate "/" $ splitSegments pathParts $ strEncode rId) +msgQueueDirectory JournalMsgStore {config = JournalStoreConfig {storePath, pathParts}} qId = + storePath B.unpack (B.intercalate "/" $ splitSegments pathParts $ strEncode qId) where splitSegments _ "" = [] splitSegments 1 s = [s] @@ -723,11 +781,14 @@ msgQueueDirectory JournalMsgStore {config = JournalStoreConfig {storePath, pathP let (seg, s') = B.splitAt 2 s in seg : splitSegments (n - 1) s' -queueRecPath :: FilePath -> String -> FilePath -queueRecPath dir queueId = dir (queueRecFileName <> "." <> queueId <> logFileExt) +queueRecPath :: FilePath -> RecipientId -> FilePath +queueRecPath dir rId = dir (queueRecFileName <> "." <> B.unpack (strEncode rId) <> logFileExt) -msgQueueStatePath :: FilePath -> String -> FilePath -msgQueueStatePath dir queueId = dir (queueLogFileName <> "." <> queueId <> logFileExt) +queueRefPath :: FilePath -> QueueRef -> QueueId -> FilePath +queueRefPath dir qRef qId = dir (queueRefFileName qRef <> "." <> B.unpack (strEncode qId) <> queueRefFileExt) + +msgQueueStatePath :: FilePath -> RecipientId -> FilePath +msgQueueStatePath dir rId = dir (queueLogFileName <> "." <> B.unpack (strEncode rId) <> logFileExt) createNewJournal :: FilePath -> ByteString -> IO Handle createNewJournal dir journalId = do @@ -855,8 +916,7 @@ readWriteQueueState JournalMsgStore {random, config} statePath = -- Temporary backup file will be used when it is present. renameFile statePath tempBackup -- 1) temp backup r <- writeQueueState st -- 2) save state - ts <- getCurrentTime - renameFile tempBackup (statePath <> "." <> iso8601Show ts <> ".bak") -- 3) timed backup + renameFile tempBackup =<< timedBackupName statePath -- 3) timed backup pure r writeQueueState st = do sh <- openFile statePath AppendMode @@ -891,27 +951,26 @@ validQueueState MsgQueueState {readState = rs, writeState = ws, size} && bytePos ws == byteCount ws deleteQueue_ :: forall s. JournalMsgStore s -> JournalQueue s -> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s))) -deleteQueue_ st sq = - isolateQueueId "deleteQueue_" st rId $ +deleteQueue_ ms sq = + isolateQueueId "deleteQueue_" ms rId $ E.uninterruptibleMask_ $ delete >>= mapM (traverse remove) where rId = recipientId sq qr = queueRec sq delete :: IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s))) - delete = case queueStore st of - MQStore st' -> deleteQueue' st' sq - JQStore {senders_, notifiers_} -> atomically (readQueueRec qr) >>= mapM jqDelete + delete = case queueStore ms of + MQStore st -> deleteQueue' st sq + st@JQStore {} -> atomically (readQueueRec qr) >>= mapM jqDelete where jqDelete q = E.uninterruptibleMask_ $ do - deleteQueueRef st (senderId q) senders_ - forM_ (notifier q) $ \NtfCreds {notifierId} -> deleteQueueRef st notifierId notifiers_ - deleteQueueDir sq + deleteQueueRef ms QRSender (senderId q) (senders_ st) + forM_ (notifier q) $ \NtfCreds {notifierId} -> deleteQueueRef ms QRNotifier notifierId (notifiers_ st) atomically $ writeTVar qr Nothing (q,) <$> atomically (swapTVar (msgQueue_' sq) Nothing) remove :: Maybe (JournalMsgQueue s) -> IO (Maybe (JournalMsgQueue s)) remove mq = do mapM_ closeMsgQueueHandles mq - removeQueueDirectory st rId + removeQueueDirectory ms rId pure mq closeMsgQueue :: JournalQueue s -> IO () diff --git a/src/Simplex/Messaging/Server/MsgStore/STM.hs b/src/Simplex/Messaging/Server/MsgStore/STM.hs index a72a99bc4..4ae989c5c 100644 --- a/src/Simplex/Messaging/Server/MsgStore/STM.hs +++ b/src/Simplex/Messaging/Server/MsgStore/STM.hs @@ -68,10 +68,10 @@ instance MsgStoreClass STMMsgStore where closeMsgStore st = readTVarIO (storeLog $ queueStore st) >>= mapM_ closeStoreLog - activeMsgQueues = queues . queueStore - {-# INLINE activeMsgQueues #-} + withActiveMsgQueues = withQueues . queueStore + {-# INLINE withActiveMsgQueues #-} - withAllMsgQueues _ = withActiveMsgQueues + withAllMsgQueues _ = withQueues . queueStore {-# INLINE withAllMsgQueues #-} logQueueStates _ = pure () diff --git a/src/Simplex/Messaging/Server/MsgStore/Types.hs b/src/Simplex/Messaging/Server/MsgStore/Types.hs index c5fba950f..88d7954e7 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Types.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Types.hs @@ -17,7 +17,6 @@ module Simplex.Messaging.Server.MsgStore.Types where import Control.Concurrent.STM -import Control.Monad (foldM) import Control.Monad.Trans.Except import Data.Functor (($>)) import Data.Int (Int64) @@ -50,7 +49,7 @@ class Monad (StoreMonad s) => MsgStoreClass s where type MsgQueue s = q | q -> s newMsgStore :: MsgStoreConfig s -> IO s closeMsgStore :: s -> IO () - activeMsgQueues :: s -> TMap RecipientId (StoreQueue s) + withActiveMsgQueues :: Monoid a => s -> (StoreQueue s -> IO a) -> IO a withAllMsgQueues :: Monoid a => Bool -> s -> (StoreQueue s -> IO a) -> IO a logQueueStates :: s -> IO () logQueueState :: StoreQueue s -> StoreMonad s () @@ -107,13 +106,6 @@ getQueueRec st party qId = getQueue st party qId $>>= (\q -> maybe (Left AUTH) (Right . (q,)) <$> readTVarIO (queueRec' q)) -withActiveMsgQueues :: (MsgStoreClass s, Monoid a) => s -> (StoreQueue s -> IO a) -> IO a -withActiveMsgQueues st f = readTVarIO (activeMsgQueues st) >>= foldM run mempty - where - run !acc q = do - r <- f q - pure $! acc <> r - getQueueMessages :: MsgStoreClass s => Bool -> s -> StoreQueue s -> ExceptT ErrorType IO [Message] getQueueMessages drainMsgs st q = withPeekMsgQueue st q "getQueueSize" $ maybe (pure []) (getQueueMessages_ drainMsgs q . fst) {-# INLINE getQueueMessages #-} diff --git a/src/Simplex/Messaging/Server/QueueStore/STM.hs b/src/Simplex/Messaging/Server/QueueStore/STM.hs index f0ce8b8d6..d7810de4b 100644 --- a/src/Simplex/Messaging/Server/QueueStore/STM.hs +++ b/src/Simplex/Messaging/Server/QueueStore/STM.hs @@ -17,6 +17,7 @@ module Simplex.Messaging.Server.QueueStore.STM ( STMQueueStore (..), newQueueStore, setStoreLog, + withQueues, addQueue', getQueue', secureQueue', @@ -63,6 +64,11 @@ newQueueStore = do setStoreLog :: STMQueueStore q -> StoreLog 'WriteMode -> IO () setStoreLog st sl = atomically $ writeTVar (storeLog st) (Just sl) +withQueues :: Monoid a => STMQueueStore (StoreQueue s) -> (StoreQueue s -> IO a) -> IO a +withQueues st f = readTVarIO (queues st) >>= foldM run mempty + where + run !acc = fmap (acc <>) . f + addQueue' :: STMStoreClass s => s -> RecipientId -> QueueRec -> IO (Either ErrorType (StoreQueue s)) addQueue' ms rId qr@QueueRec {senderId = sId, notifier} = (mkQueue ms rId qr >>= atomically . add) diff --git a/tests/CoreTests/MsgStoreTests.hs b/tests/CoreTests/MsgStoreTests.hs index fa3d5b3ae..92674b75d 100644 --- a/tests/CoreTests/MsgStoreTests.hs +++ b/tests/CoreTests/MsgStoreTests.hs @@ -23,7 +23,6 @@ import Control.Monad.Trans.Except import Crypto.Random (ChaChaDRG) import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B -import qualified Data.ByteString.Base64.URL as B64 import Data.Maybe (fromJust) import Data.Time.Clock.System (getSystemTime) import Simplex.Messaging.Crypto (pattern MaxLenBS) @@ -230,7 +229,7 @@ testQueueState ms = do g <- C.newRandom rId <- EntityId <$> atomically (C.randomBytes 24 g) let dir = msgQueueDirectory ms rId - statePath = msgQueueStatePath dir $ B.unpack (B64.encode $ unEntityId rId) + statePath = msgQueueStatePath dir rId createDirectoryIfMissing True dir state <- newMsgQueueState <$> newJournalId (random ms) withFile statePath WriteMode (`appendState` state) @@ -295,7 +294,7 @@ testMessageState ms = do g <- C.newRandom (rId, qr) <- testNewQueueRec g True let dir = msgQueueDirectory ms rId - statePath = msgQueueStatePath dir $ B.unpack (B64.encode $ unEntityId rId) + statePath = msgQueueStatePath dir rId write q s = writeMsg ms q True =<< mkMessage s mId1 <- runRight $ do diff --git a/tests/Test.hs b/tests/Test.hs index ede658d21..027b3ba82 100644 --- a/tests/Test.hs +++ b/tests/Test.hs @@ -61,9 +61,11 @@ main = do describe "Store log tests" storeLogTests describe "TRcvQueues tests" tRcvQueuesTests describe "Util tests" utilTests - describe "SMP server via TLS, hybrid message store" $ do + describe "SMP server via TLS, hybrid store" $ do describe "SMP syntax" $ serverSyntaxTests (transport @TLS) before (pure (transport @TLS, AMSType SMSHybrid)) serverTests + fdescribe "SMP server via TLS, journal message store" $ do + before (pure (transport @TLS, AMSType SMSJournal)) serverTests describe "SMP server via TLS, memory message store" $ before (pure (transport @TLS, AMSType SMSMemory)) serverTests -- xdescribe "SMP server via WebSockets" $ do