diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index f99d945e5..544f769b9 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -1765,7 +1765,7 @@ processServerMessages = do stored'' <- getQueueSize ms q liftIO $ closeMsgQueue q pure (stored'', expired'') - processValidateQueue :: JournalStoreType s => JournalQueue s -> IO MessageStats + processValidateQueue :: JournalQueue s -> IO MessageStats processValidateQueue q = runExceptT (getQueueSize ms q) >>= \case Right storedMsgsCount -> pure newMessageStats {storedMsgsCount, storedQueues = 1} diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 20ac37993..6e9849715 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -320,14 +320,14 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, storeLogFile, msgSt AMS SMSJournal <$> newMsgStore (storeCfg SMSJournal storePath) (_, Nothing) -> putStrLn "Error: journal msg store requires that restore_messages is enabled in [STORE_LOG]" >> exitFailure where - storeCfg :: JournalStoreType s => SMSType s -> FilePath -> JournalStoreConfig s + storeCfg :: SMSType s -> FilePath -> JournalStoreConfig s storeCfg queueStoreType storePath = JournalStoreConfig {storePath, quota = msgQueueQuota, pathParts = journalMsgStoreDepth, queueStoreType, maxMsgCount = maxJournalMsgCount, maxStateLines = maxJournalStateLines, stateTailSize = defaultStateTailSize, idleInterval = idleQueueInterval} - loadStoreLog :: STMQueueStore s => s -> IO () - loadStoreLog store = forM_ storeLogFile $ \f -> do + loadStoreLog :: STMStoreClass s => s -> IO () + loadStoreLog st = forM_ storeLogFile $ \f -> do logInfo $ "restoring queues from file " <> T.pack f - sl <- readWriteQueueStore f store - setStoreLog store sl + sl <- readWriteQueueStore f st + setStoreLog (stmQueueStore st) sl getCredentials protocol creds = do files <- missingCreds unless (null files) $ do @@ -371,5 +371,5 @@ newSMPProxyAgent smpAgentCfg random = do smpAgent <- newSMPClientAgent smpAgentCfg random pure ProxyAgent {smpAgent} -readWriteQueueStore :: MsgStoreClass s => FilePath -> s -> IO (StoreLog 'WriteMode) +readWriteQueueStore :: STMStoreClass s => FilePath -> s -> IO (StoreLog 'WriteMode) readWriteQueueStore = readWriteStoreLog readQueueStore writeQueueStore diff --git a/src/Simplex/Messaging/Server/Main.hs b/src/Simplex/Messaging/Server/Main.hs index 64fcef625..3935ae0f9 100644 --- a/src/Simplex/Messaging/Server/Main.hs +++ b/src/Simplex/Messaging/Server/Main.hs @@ -108,6 +108,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = ("WARNING: message log file " <> storeMsgsFilePath <> " will be imported to journal directory " <> storeMsgsJournalDir) "Messages not imported" ms <- newJournalMsgStore + -- TODO [queues] it should not load queues if queues are in journal readQueueStore storeLogFile ms msgStats <- importMessages True ms storeMsgsFilePath Nothing -- no expiration putStrLn "Import completed" @@ -127,6 +128,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = ("WARNING: journal directory " <> storeMsgsJournalDir <> " will be exported to message log file " <> storeMsgsFilePath) "Journal not exported" ms <- newJournalMsgStore + -- TODO [queues] it should not load queues if queues are in journal readQueueStore storeLogFile ms exportMessages True ms storeMsgsFilePath False putStrLn "Export completed" diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index 8554eaa32..2074ef493 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -18,6 +18,7 @@ module Simplex.Messaging.Server.MsgStore.Journal ( JournalMsgStore (queueStore, random), + QueueStore (..), JournalQueue (queueDirectory), JournalMsgQueue (state), JournalStoreConfig (..), @@ -83,12 +84,7 @@ data JournalMsgStore s = JournalMsgStore } data QueueStore (s :: MSType) where - MQStore :: - { queues :: TMap RecipientId (JournalQueue 'MSHybrid), - senders :: TMap SenderId RecipientId, - notifiers :: TMap NotifierId RecipientId, - storeLog :: TVar (Maybe (StoreLog 'WriteMode)) - } -> QueueStore 'MSHybrid + MQStore :: STMQueueStore (JournalQueue 'MSHybrid) -> QueueStore 'MSHybrid -- maps store cached queues -- Nothing in map indicates that the queue doesn't exist JQStore :: @@ -232,16 +228,8 @@ logFileExt = ".log" newtype StoreIO (s :: MSType) a = StoreIO {unStoreIO :: IO a} deriving newtype (Functor, Applicative, Monad) -instance STMQueueStore (JournalMsgStore 'MSHybrid) where - queues' = queues . queueStore - {-# INLINE queues' #-} - senders' = senders . queueStore - {-# INLINE senders' #-} - notifiers' = notifiers . queueStore - {-# INLINE notifiers' #-} - storeLog' = storeLog . queueStore - {-# INLINE storeLog' #-} - setStoreLog st sl = atomically $ writeTVar (storeLog' st) (Just sl) +instance STMStoreClass (JournalMsgStore 'MSHybrid) where + stmQueueStore JournalMsgStore {queueStore = MQStore st} = st mkQueue st rId qr = do lock <- atomically $ getMapLock (queueLocks st) rId makeQueue st lock rId qr @@ -275,11 +263,7 @@ instance JournalStoreType s => MsgStoreClass (JournalMsgStore s) where queueLocks :: TMap RecipientId Lock <- TM.emptyIO case queueStoreType config of SMSHybrid -> do - queues <- TM.emptyIO - senders <- TM.emptyIO - notifiers <- TM.emptyIO - storeLog <- newTVarIO Nothing - let queueStore = MQStore {queues, senders, notifiers, storeLog} + queueStore <- MQStore <$> newQueueStore pure JournalMsgStore {config, random, queueLocks, queueStore} SMSJournal -> do queues_ <- TM.emptyIO @@ -288,15 +272,15 @@ instance JournalStoreType s => MsgStoreClass (JournalMsgStore s) where let queueStore = JQStore {queues_, senders_, notifiers_} pure JournalMsgStore {config, random, queueLocks, queueStore} - closeMsgStore st = case queueStore st of - MQStore {queues, storeLog} -> do - readTVarIO storeLog >>= mapM_ closeStoreLog - readTVarIO queues >>= mapM_ closeMsgQueue + closeMsgStore ms = case queueStore ms of + MQStore st -> do + readTVarIO (storeLog st) >>= mapM_ closeStoreLog + readTVarIO (queues st) >>= mapM_ closeMsgQueue JQStore {queues_} -> readTVarIO queues_ >>= mapM_ (mapM closeMsgQueue) - activeMsgQueues st = case queueStore st of - MQStore {queues} -> queues + activeMsgQueues ms = case queueStore ms of + MQStore st -> queues st JQStore {} -> undefined -- TODO [queues] -- This function is a "foldr" that opens and closes all queues, processes them as defined by action and accumulates the result. @@ -363,10 +347,10 @@ instance JournalStoreType s => MsgStoreClass (JournalMsgStore s) where {-# INLINE msgQueue_' #-} queueCounts :: JournalMsgStore s -> IO QueueCounts - queueCounts st = case queueStore st of - MQStore {queues, notifiers} -> do - queueCount <- M.size <$> readTVarIO queues - notifierCount <- M.size <$> readTVarIO notifiers + queueCounts ms = case queueStore ms of + MQStore st -> do + queueCount <- M.size <$> readTVarIO (queues st) + notifierCount <- M.size <$> readTVarIO (notifiers st) pure QueueCounts {queueCount, notifierCount} JQStore {queues_, notifiers_} -> do queueCount <- M.size <$> readTVarIO queues_ @@ -398,7 +382,7 @@ instance JournalStoreType s => MsgStoreClass (JournalMsgStore s) where getQueue :: DirectParty p => JournalMsgStore s -> SParty p -> QueueId -> IO (Either ErrorType (JournalQueue s)) getQueue st party qId = case queueStore st of - MQStore {} -> getQueue' st party qId + MQStore st' -> getQueue' st' party qId JQStore {queues_, senders_, notifiers_} -> isolateQueueId "getQueue" st qId $ maybe (Left AUTH) Right <$> case party of @@ -414,7 +398,7 @@ instance JournalStoreType s => MsgStoreClass (JournalMsgStore s) where secureQueue :: JournalMsgStore s -> JournalQueue s -> SndPublicAuthKey -> IO (Either ErrorType ()) secureQueue st sq sKey = case queueStore st of - MQStore {} -> secureQueue' st sq sKey + MQStore st' -> secureQueue' st' sq sKey JQStore {} -> isolateQueueRec sq "secureQueue" $ \q -> case senderKey q of Just k -> pure $ if sKey == k then Right () else Left AUTH @@ -422,7 +406,7 @@ instance JournalStoreType s => MsgStoreClass (JournalMsgStore s) where addQueueNotifier :: JournalMsgStore s -> JournalQueue s -> NtfCreds -> IO (Either ErrorType (Maybe NotifierId)) addQueueNotifier st sq ntfCreds@NtfCreds {notifierId = nId} = case queueStore st of - MQStore {} -> addQueueNotifier' st sq ntfCreds + MQStore st' -> addQueueNotifier' st' sq ntfCreds JQStore {notifiers_} -> isolateQueueRec sq "addQueueNotifier" $ \q -> withLockMap (queueLocks st) nId "addQueueNotifierN" $ @@ -440,7 +424,7 @@ instance JournalStoreType s => MsgStoreClass (JournalMsgStore s) where deleteQueueNotifier :: JournalMsgStore s -> JournalQueue s -> IO (Either ErrorType (Maybe NotifierId)) deleteQueueNotifier st sq = case queueStore st of - MQStore {} -> deleteQueueNotifier' st sq + MQStore st' -> deleteQueueNotifier' st' sq JQStore {notifiers_} -> isolateQueueRec sq "deleteQueueNotifier" $ \q -> fmap Right $ forM (notifier q) $ \NtfCreds {notifierId = nId} -> @@ -451,14 +435,14 @@ instance JournalStoreType s => MsgStoreClass (JournalMsgStore s) where suspendQueue :: JournalMsgStore s -> JournalQueue s -> IO (Either ErrorType ()) suspendQueue st sq = case queueStore st of - MQStore {} -> suspendQueue' st sq + MQStore st' -> suspendQueue' st' sq JQStore {} -> isolateQueueRec sq "suspendQueue" $ \q -> fmap Right $ storeQueue sq q {status = QueueOff} updateQueueTime :: JournalMsgStore s -> JournalQueue s -> RoundedSystemTime -> IO (Either ErrorType QueueRec) updateQueueTime st sq t = case queueStore st of - MQStore {} -> updateQueueTime' st sq t + MQStore st' -> updateQueueTime' st' sq t JQStore {} -> isolateQueueRec sq "updateQueueTime" $ fmap Right . update where update q@QueueRec {updatedAt} @@ -915,7 +899,7 @@ deleteQueue_ st sq = qr = queueRec sq delete :: IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s))) delete = case queueStore st of - MQStore {} -> deleteQueue' st sq + MQStore st' -> deleteQueue' st' sq JQStore {senders_, notifiers_} -> atomically (readQueueRec qr) >>= mapM jqDelete where jqDelete q = E.uninterruptibleMask_ $ do diff --git a/src/Simplex/Messaging/Server/MsgStore/STM.hs b/src/Simplex/Messaging/Server/MsgStore/STM.hs index 024198923..a72a99bc4 100644 --- a/src/Simplex/Messaging/Server/MsgStore/STM.hs +++ b/src/Simplex/Messaging/Server/MsgStore/STM.hs @@ -27,17 +27,11 @@ import Simplex.Messaging.Server.MsgStore.Types import Simplex.Messaging.Server.QueueStore import Simplex.Messaging.Server.QueueStore.STM import Simplex.Messaging.Server.StoreLog -import Simplex.Messaging.TMap (TMap) -import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Util ((<$$>), ($>>=)) -import System.IO (IOMode (..)) data STMMsgStore = STMMsgStore { storeConfig :: STMStoreConfig, - queues :: TMap RecipientId STMQueue, - senders :: TMap SenderId RecipientId, - notifiers :: TMap NotifierId RecipientId, - storeLog :: TVar (Maybe (StoreLog 'WriteMode)) + queueStore :: STMQueueStore STMQueue } data STMQueue = STMQueue @@ -59,16 +53,8 @@ data STMStoreConfig = STMStoreConfig quota :: Int } -instance STMQueueStore STMMsgStore where - queues' = queues - {-# INLINE queues' #-} - senders' = senders - {-# INLINE senders' #-} - notifiers' = notifiers - {-# INLINE notifiers' #-} - storeLog' = storeLog - {-# INLINE storeLog' #-} - setStoreLog st sl = atomically $ writeTVar (storeLog st) (Just sl) +instance STMStoreClass STMMsgStore where + stmQueueStore = queueStore mkQueue _ rId qr = STMQueue rId <$> newTVarIO (Just qr) <*> newTVarIO Nothing instance MsgStoreClass STMMsgStore where @@ -78,16 +64,11 @@ instance MsgStoreClass STMMsgStore where type MsgStoreConfig STMMsgStore = STMStoreConfig newMsgStore :: STMStoreConfig -> IO STMMsgStore - newMsgStore storeConfig = do - queues <- TM.emptyIO - senders <- TM.emptyIO - notifiers <- TM.emptyIO - storeLog <- newTVarIO Nothing - pure STMMsgStore {storeConfig, queues, senders, notifiers, storeLog} + newMsgStore storeConfig = STMMsgStore storeConfig <$> newQueueStore - closeMsgStore st = readTVarIO (storeLog st) >>= mapM_ closeStoreLog + closeMsgStore st = readTVarIO (storeLog $ queueStore st) >>= mapM_ closeStoreLog - activeMsgQueues = queues + activeMsgQueues = queues . queueStore {-# INLINE activeMsgQueues #-} withAllMsgQueues _ = withActiveMsgQueues @@ -107,30 +88,30 @@ instance MsgStoreClass STMMsgStore where {-# INLINE msgQueue_' #-} queueCounts :: STMMsgStore -> IO QueueCounts - queueCounts st = do - queueCount <- M.size <$> readTVarIO (queues st) - notifierCount <- M.size <$> readTVarIO (notifiers st) + queueCounts STMMsgStore {queueStore} = do + queueCount <- M.size <$> readTVarIO (queues queueStore) + notifierCount <- M.size <$> readTVarIO (notifiers queueStore) pure QueueCounts {queueCount, notifierCount} addQueue = addQueue' {-# INLINE addQueue #-} - getQueue = getQueue' + getQueue = getQueue' . queueStore {-# INLINE getQueue #-} - secureQueue = secureQueue' + secureQueue = secureQueue' . queueStore {-# INLINE secureQueue #-} - addQueueNotifier = addQueueNotifier' + addQueueNotifier = addQueueNotifier' . queueStore {-# INLINE addQueueNotifier #-} - deleteQueueNotifier = deleteQueueNotifier' + deleteQueueNotifier = deleteQueueNotifier' . queueStore {-# INLINE deleteQueueNotifier #-} - suspendQueue = suspendQueue' + suspendQueue = suspendQueue' . queueStore {-# INLINE suspendQueue #-} - updateQueueTime = updateQueueTime' + updateQueueTime = updateQueueTime' . queueStore {-# INLINE updateQueueTime #-} getMsgQueue :: STMMsgStore -> STMQueue -> STM STMMsgQueue @@ -157,10 +138,10 @@ instance MsgStoreClass STMMsgStore where Nothing -> pure (Nothing, 0) deleteQueue :: STMMsgStore -> STMQueue -> IO (Either ErrorType QueueRec) - deleteQueue ms q = fst <$$> deleteQueue' ms q + deleteQueue ms q = fst <$$> deleteQueue' (queueStore ms) q deleteQueueSize :: STMMsgStore -> STMQueue -> IO (Either ErrorType (QueueRec, Int)) - deleteQueueSize ms q = deleteQueue' ms q >>= mapM (traverse getSize) + deleteQueueSize ms q = deleteQueue' (queueStore ms) q >>= mapM (traverse getSize) -- traverse operates on the second tuple element where getSize = maybe (pure 0) (\STMMsgQueue {size} -> readTVarIO size) diff --git a/src/Simplex/Messaging/Server/MsgStore/Types.hs b/src/Simplex/Messaging/Server/MsgStore/Types.hs index 58bc2cc39..c5fba950f 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Types.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Types.hs @@ -32,12 +32,15 @@ import Simplex.Messaging.TMap (TMap) import Simplex.Messaging.Util ((<$$>), ($>>=)) import System.IO (IOMode (..)) -class MsgStoreClass s => STMQueueStore s where - queues' :: s -> TMap RecipientId (StoreQueue s) - senders' :: s -> TMap SenderId RecipientId - notifiers' :: s -> TMap NotifierId RecipientId - storeLog' :: s -> TVar (Maybe (StoreLog 'WriteMode)) - setStoreLog :: s -> StoreLog 'WriteMode -> IO () +data STMQueueStore q = STMQueueStore + { queues :: TMap RecipientId q, + senders :: TMap SenderId RecipientId, + notifiers :: TMap NotifierId RecipientId, + storeLog :: TVar (Maybe (StoreLog 'WriteMode)) + } + +class MsgStoreClass s => STMStoreClass s where + stmQueueStore :: s -> STMQueueStore (StoreQueue s) mkQueue :: s -> RecipientId -> QueueRec -> IO (StoreQueue s) class Monad (StoreMonad s) => MsgStoreClass s where diff --git a/src/Simplex/Messaging/Server/QueueStore/STM.hs b/src/Simplex/Messaging/Server/QueueStore/STM.hs index 8836468b1..f0ce8b8d6 100644 --- a/src/Simplex/Messaging/Server/QueueStore/STM.hs +++ b/src/Simplex/Messaging/Server/QueueStore/STM.hs @@ -14,7 +14,10 @@ {-# LANGUAGE UndecidableInstances #-} module Simplex.Messaging.Server.QueueStore.STM - ( addQueue', + ( STMQueueStore (..), + newQueueStore, + setStoreLog, + addQueue', getQueue', secureQueue', addQueueNotifier', @@ -31,39 +34,57 @@ where import qualified Control.Exception as E import Control.Logger.Simple import Control.Monad +import Control.Monad.IO.Class +import Control.Monad.Trans.Except import Data.Bitraversable (bimapM) +import qualified Data.ByteString.Char8 as B +import qualified Data.ByteString.Lazy.Char8 as LB import Data.Functor (($>)) import qualified Data.Text as T +import Data.Text.Encoding (decodeLatin1) +import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol import Simplex.Messaging.Server.MsgStore.Types import Simplex.Messaging.Server.QueueStore import Simplex.Messaging.Server.StoreLog import qualified Simplex.Messaging.TMap as TM -import Simplex.Messaging.Util (anyM, ifM, ($>>=), (<$$)) +import Simplex.Messaging.Util (anyM, ifM, tshow, ($>>=), (<$$)) import System.IO import UnliftIO.STM -addQueue' :: STMQueueStore s => s -> RecipientId -> QueueRec -> IO (Either ErrorType (StoreQueue s)) -addQueue' st rId qr@QueueRec {senderId = sId, notifier} = - (mkQueue st rId qr >>= atomically . add) +newQueueStore :: IO (STMQueueStore q) +newQueueStore = do + queues <- TM.emptyIO + senders <- TM.emptyIO + notifiers <- TM.emptyIO + storeLog <- newTVarIO Nothing + pure STMQueueStore {queues, senders, notifiers, storeLog} + +setStoreLog :: STMQueueStore q -> StoreLog 'WriteMode -> IO () +setStoreLog st sl = atomically $ writeTVar (storeLog st) (Just sl) + +addQueue' :: STMStoreClass s => s -> RecipientId -> QueueRec -> IO (Either ErrorType (StoreQueue s)) +addQueue' ms rId qr@QueueRec {senderId = sId, notifier} = + (mkQueue ms rId qr >>= atomically . add) $>>= \q -> q <$$ withLog "addQueue" st (\s -> logCreateQueue s rId qr) where + st = stmQueueStore ms add q = ifM hasId (pure $ Left DUPLICATE_) $ do - TM.insert rId q $ queues' st - TM.insert sId rId $ senders' st - forM_ notifier $ \NtfCreds {notifierId} -> TM.insert notifierId rId $ notifiers' st + TM.insert rId q $ queues st + TM.insert sId rId $ senders st + forM_ notifier $ \NtfCreds {notifierId} -> TM.insert notifierId rId $ notifiers st pure $ Right q - hasId = anyM [TM.member rId $ queues' st, TM.member sId $ senders' st, hasNotifier] - hasNotifier = maybe (pure False) (\NtfCreds {notifierId} -> TM.member notifierId (notifiers' st)) notifier + hasId = anyM [TM.member rId $ queues st, TM.member sId $ senders st, hasNotifier] + hasNotifier = maybe (pure False) (\NtfCreds {notifierId} -> TM.member notifierId (notifiers st)) notifier -getQueue' :: (STMQueueStore s, DirectParty p) => s -> SParty p -> QueueId -> IO (Either ErrorType (StoreQueue s)) +getQueue' :: DirectParty p => STMQueueStore q -> SParty p -> QueueId -> IO (Either ErrorType q) getQueue' st party qId = maybe (Left AUTH) Right <$> case party of - SRecipient -> TM.lookupIO qId $ queues' st - SSender -> TM.lookupIO qId (senders' st) $>>= (`TM.lookupIO` queues' st) - SNotifier -> TM.lookupIO qId (notifiers' st) $>>= (`TM.lookupIO` queues' st) + SRecipient -> TM.lookupIO qId $ queues st + SSender -> TM.lookupIO qId (senders st) $>>= (`TM.lookupIO` queues st) + SNotifier -> TM.lookupIO qId (notifiers st) $>>= (`TM.lookupIO` queues st) -secureQueue' :: STMQueueStore s => s -> StoreQueue s -> SndPublicAuthKey -> IO (Either ErrorType ()) +secureQueue' :: MsgStoreClass s => STMQueueStore (StoreQueue s) -> StoreQueue s -> SndPublicAuthKey -> IO (Either ErrorType ()) secureQueue' st sq sKey = atomically (readQueueRec qr $>>= secure) $>>= \_ -> withLog "secureQueue" st $ \s -> logSecureQueue s (recipientId' sq) sKey @@ -75,32 +96,32 @@ secureQueue' st sq sKey = writeTVar qr $ Just q {senderKey = Just sKey} pure $ Right () -addQueueNotifier' :: STMQueueStore s => s -> StoreQueue s -> NtfCreds -> IO (Either ErrorType (Maybe NotifierId)) +addQueueNotifier' :: MsgStoreClass s => STMQueueStore (StoreQueue s) -> StoreQueue s -> NtfCreds -> IO (Either ErrorType (Maybe NotifierId)) addQueueNotifier' st sq ntfCreds@NtfCreds {notifierId = nId} = atomically (readQueueRec qr $>>= add) $>>= \nId_ -> nId_ <$$ withLog "addQueueNotifier" st (\s -> logAddNotifier s rId ntfCreds) where qr = queueRec' sq rId = recipientId' sq - add q = ifM (TM.member nId (notifiers' st)) (pure $ Left DUPLICATE_) $ do - nId_ <- forM (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId (notifiers' st) $> notifierId + add q = ifM (TM.member nId (notifiers st)) (pure $ Left DUPLICATE_) $ do + nId_ <- forM (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId (notifiers st) $> notifierId let !q' = q {notifier = Just ntfCreds} writeTVar qr $ Just q' - TM.insert nId rId $ notifiers' st + TM.insert nId rId $ notifiers st pure $ Right nId_ -deleteQueueNotifier' :: STMQueueStore s => s -> StoreQueue s -> IO (Either ErrorType (Maybe NotifierId)) +deleteQueueNotifier' :: MsgStoreClass s => STMQueueStore (StoreQueue s) -> StoreQueue s -> IO (Either ErrorType (Maybe NotifierId)) deleteQueueNotifier' st sq = atomically (readQueueRec qr >>= mapM delete) $>>= \nId_ -> nId_ <$$ withLog "deleteQueueNotifier" st (`logDeleteNotifier` recipientId' sq) where qr = queueRec' sq delete q = forM (notifier q) $ \NtfCreds {notifierId} -> do - TM.delete notifierId $ notifiers' st + TM.delete notifierId $ notifiers st writeTVar qr $! Just q {notifier = Nothing} pure notifierId -suspendQueue' :: STMQueueStore s => s -> StoreQueue s -> IO (Either ErrorType ()) +suspendQueue' :: MsgStoreClass s => STMQueueStore (StoreQueue s) -> StoreQueue s -> IO (Either ErrorType ()) suspendQueue' st sq = atomically (readQueueRec qr >>= mapM suspend) $>>= \_ -> withLog "suspendQueue" st (`logSuspendQueue` recipientId' sq) @@ -108,7 +129,7 @@ suspendQueue' st sq = qr = queueRec' sq suspend q = writeTVar qr $! Just q {status = QueueOff} -updateQueueTime' :: STMQueueStore s => s -> StoreQueue s -> RoundedSystemTime -> IO (Either ErrorType QueueRec) +updateQueueTime' :: MsgStoreClass s => STMQueueStore (StoreQueue s) -> StoreQueue s -> RoundedSystemTime -> IO (Either ErrorType QueueRec) updateQueueTime' st sq t = atomically (readQueueRec qr >>= mapM update) $>>= log' where qr = queueRec' sq @@ -121,7 +142,7 @@ updateQueueTime' st sq t = atomically (readQueueRec qr >>= mapM update) $>>= log | changed = q <$$ withLog "updateQueueTime" st (\sl -> logUpdateQueueTime sl (recipientId' sq) t) | otherwise = pure $ Right q -deleteQueue' :: STMQueueStore s => s -> StoreQueue s -> IO (Either ErrorType (QueueRec, Maybe (MsgQueue s))) +deleteQueue' :: MsgStoreClass s => STMQueueStore (StoreQueue s) -> StoreQueue s -> IO (Either ErrorType (QueueRec, Maybe (MsgQueue s))) deleteQueue' st sq = atomically (readQueueRec qr >>= mapM delete) $>>= \q -> withLog "deleteQueue" st (`logDeleteQueue` recipientId' sq) @@ -130,8 +151,8 @@ deleteQueue' st sq = qr = queueRec' sq delete q = do writeTVar qr Nothing - TM.delete (senderId q) $ senders' st - forM_ (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId $ notifiers' st + TM.delete (senderId q) $ senders st + forM_ (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId $ notifiers st pure q readQueueRec :: TVar (Maybe QueueRec) -> STM (Either ErrorType QueueRec) @@ -148,5 +169,37 @@ withLog' name sl action = where err = name <> ", withLog, " <> show e -withLog :: STMQueueStore s => String -> s -> (StoreLog 'WriteMode -> IO ()) -> IO (Either ErrorType ()) -withLog name = withLog' name . storeLog' +withLog :: String -> STMQueueStore q -> (StoreLog 'WriteMode -> IO ()) -> IO (Either ErrorType ()) +withLog name = withLog' name . storeLog + +readQueueStore :: forall s. STMStoreClass s => FilePath -> s -> IO () +readQueueStore f ms = withFile f ReadMode $ LB.hGetContents >=> mapM_ processLine . LB.lines + where + st = stmQueueStore ms + processLine :: LB.ByteString -> IO () + processLine s' = either printError procLogRecord (strDecode s) + where + s = LB.toStrict s' + procLogRecord :: StoreLogRecord -> IO () + procLogRecord = \case + CreateQueue rId q -> addQueue' ms rId q >>= qError rId "CreateQueue" + SecureQueue qId sKey -> withQueue qId "SecureQueue" $ \q -> secureQueue' st q sKey + AddNotifier qId ntfCreds -> withQueue qId "AddNotifier" $ \q -> addQueueNotifier' st q ntfCreds + SuspendQueue qId -> withQueue qId "SuspendQueue" $ suspendQueue' st + DeleteQueue qId -> withQueue qId "DeleteQueue" $ deleteQueue' st + DeleteNotifier qId -> withQueue qId "DeleteNotifier" $ deleteQueueNotifier' st + UpdateTime qId t -> withQueue qId "UpdateTime" $ \q -> updateQueueTime' st q t + printError :: String -> IO () + printError e = B.putStrLn $ "Error parsing log: " <> B.pack e <> " - " <> s + withQueue :: forall a. RecipientId -> T.Text -> (StoreQueue s -> IO (Either ErrorType a)) -> IO () + withQueue qId op a = runExceptT go >>= qError qId op + where + go = do + q <- ExceptT $ getQueue' st SRecipient qId + liftIO (readTVarIO $ queueRec' q) >>= \case + Nothing -> logWarn $ logPfx qId op <> "already deleted" + Just _ -> void $ ExceptT $ a q + qError qId op = \case + Left e -> logError $ logPfx qId op <> tshow e + Right _ -> pure () + logPfx qId op = "STORE: " <> op <> ", stored queue " <> decodeLatin1 (strEncode qId) <> ", " diff --git a/src/Simplex/Messaging/Server/StoreLog.hs b/src/Simplex/Messaging/Server/StoreLog.hs index 4cc55e978..fa47978be 100644 --- a/src/Simplex/Messaging/Server/StoreLog.hs +++ b/src/Simplex/Messaging/Server/StoreLog.hs @@ -26,7 +26,6 @@ module Simplex.Messaging.Server.StoreLog logUpdateQueueTime, readWriteStoreLog, writeQueueStore, - readQueueStore, ) where @@ -35,14 +34,10 @@ import Control.Concurrent.STM import qualified Control.Exception as E import Control.Logger.Simple import Control.Monad -import Control.Monad.IO.Class -import Control.Monad.Trans.Except import qualified Data.Attoparsec.ByteString.Char8 as A import qualified Data.ByteString.Char8 as B -import qualified Data.ByteString.Lazy.Char8 as LB import qualified Data.Map.Strict as M import qualified Data.Text as T -import Data.Text.Encoding (decodeLatin1) import Data.Time.Clock (getCurrentTime) import Data.Time.Format.ISO8601 (iso8601Show) import GHC.IO (catchAny) @@ -226,42 +221,12 @@ readWriteStoreLog readStore writeStore f st = renameFile tempBackup timedBackup logInfo $ "original state preserved as " <> T.pack timedBackup -writeQueueStore :: MsgStoreClass s => StoreLog 'WriteMode -> s -> IO () -writeQueueStore s st = readTVarIO (activeMsgQueues st) >>= mapM_ writeQueue . M.assocs +writeQueueStore :: STMStoreClass s => StoreLog 'WriteMode -> s -> IO () +writeQueueStore s st = readTVarIO qs >>= mapM_ writeQueue . M.assocs where + qs = queues $ stmQueueStore st writeQueue (rId, q) = readTVarIO (queueRec' q) >>= \case Just q' -> when (active q') $ logCreateQueue s rId q' -- TODO we should log suspended queues when we use them - Nothing -> atomically $ TM.delete rId $ activeMsgQueues st + Nothing -> atomically $ TM.delete rId qs active QueueRec {status} = status == QueueActive - -readQueueStore :: forall s. MsgStoreClass s => FilePath -> s -> IO () -readQueueStore f st = withFile f ReadMode $ LB.hGetContents >=> mapM_ processLine . LB.lines - where - processLine :: LB.ByteString -> IO () - processLine s' = either printError procLogRecord (strDecode s) - where - s = LB.toStrict s' - procLogRecord :: StoreLogRecord -> IO () - procLogRecord = \case - CreateQueue rId q -> addQueue st rId q >>= qError rId "CreateQueue" - SecureQueue qId sKey -> withQueue qId "SecureQueue" $ \q -> secureQueue st q sKey - AddNotifier qId ntfCreds -> withQueue qId "AddNotifier" $ \q -> addQueueNotifier st q ntfCreds - SuspendQueue qId -> withQueue qId "SuspendQueue" $ suspendQueue st - DeleteQueue qId -> withQueue qId "DeleteQueue" $ deleteQueue st - DeleteNotifier qId -> withQueue qId "DeleteNotifier" $ deleteQueueNotifier st - UpdateTime qId t -> withQueue qId "UpdateTime" $ \q -> updateQueueTime st q t - printError :: String -> IO () - printError e = B.putStrLn $ "Error parsing log: " <> B.pack e <> " - " <> s - withQueue :: forall a. RecipientId -> T.Text -> (StoreQueue s -> IO (Either ErrorType a)) -> IO () - withQueue qId op a = runExceptT go >>= qError qId op - where - go = do - q <- ExceptT $ getQueue st SRecipient qId - liftIO (readTVarIO $ queueRec' q) >>= \case - Nothing -> logWarn $ logPfx qId op <> "already deleted" - Just _ -> void $ ExceptT $ a q - qError qId op = \case - Left e -> logError $ logPfx qId op <> tshow e - Right _ -> pure () - logPfx qId op = "STORE: " <> op <> ", stored queue " <> decodeLatin1 (strEncode qId) <> ", " diff --git a/tests/CoreTests/MsgStoreTests.hs b/tests/CoreTests/MsgStoreTests.hs index beed5811a..fa3d5b3ae 100644 --- a/tests/CoreTests/MsgStoreTests.hs +++ b/tests/CoreTests/MsgStoreTests.hs @@ -57,12 +57,12 @@ msgStoreTests = do it "should create write file when missing" testWriteFileMissing it "should create read file when read and write files are missing" testReadAndWriteFilesMissing where - someMsgStoreTests :: STMQueueStore s => SpecWith s + someMsgStoreTests :: STMStoreClass s => SpecWith s someMsgStoreTests = do it "should get queue and store/read messages" testGetQueue it "should not fail on EOF when changing read journal" testChangeReadJournal -withMsgStore :: STMQueueStore s => MsgStoreConfig s -> (s -> IO ()) -> IO () +withMsgStore :: STMStoreClass s => MsgStoreConfig s -> (s -> IO ()) -> IO () withMsgStore cfg = bracket (newMsgStore cfg) closeMsgStore testSMTStoreConfig :: STMStoreConfig @@ -116,7 +116,7 @@ testNewQueueRec g sndSecure = do } pure (rId, qr) -testGetQueue :: STMQueueStore s => s -> IO () +testGetQueue :: STMStoreClass s => s -> IO () testGetQueue ms = do g <- C.newRandom (rId, qr) <- testNewQueueRec g True @@ -158,7 +158,7 @@ testGetQueue ms = do (Nothing, Nothing) <- tryDelPeekMsg ms q mId8 void $ ExceptT $ deleteQueue ms q -testChangeReadJournal :: STMQueueStore s => s -> IO () +testChangeReadJournal :: STMStoreClass s => s -> IO () testChangeReadJournal ms = do g <- C.newRandom (rId, qr) <- testNewQueueRec g True @@ -177,7 +177,7 @@ testChangeReadJournal ms = do (Msg "message 5", Nothing) <- tryDelPeekMsg ms q mId5 void $ ExceptT $ deleteQueue ms q -testExportImportStore :: JournalStoreType s => JournalMsgStore s -> IO () +testExportImportStore :: JournalMsgStore 'MSHybrid -> IO () testExportImportStore ms = do g <- C.newRandom (rId1, qr1) <- testNewQueueRec g True @@ -225,7 +225,7 @@ testExportImportStore ms = do exportMessages False stmStore testStoreMsgsFile False (B.sort <$> B.readFile testStoreMsgsFile `shouldReturn`) =<< (B.sort <$> B.readFile (testStoreMsgsFile2 <> ".bak")) -testQueueState :: JournalStoreType s => JournalMsgStore s -> IO () +testQueueState :: JournalMsgStore s -> IO () testQueueState ms = do g <- C.newRandom rId <- EntityId <$> atomically (C.randomBytes 24 g) diff --git a/tests/CoreTests/StoreLogTests.hs b/tests/CoreTests/StoreLogTests.hs index 104ecbdec..d4360ab0d 100644 --- a/tests/CoreTests/StoreLogTests.hs +++ b/tests/CoreTests/StoreLogTests.hs @@ -109,4 +109,4 @@ testSMPStoreLog testSuite tests = ([], compacted') <- partitionEithers . map strDecode . B.lines <$> B.readFile testStoreLogFile compacted' `shouldBe` compacted storeState :: JournalMsgStore 'MSHybrid -> IO (M.Map RecipientId QueueRec) - storeState st = M.mapMaybe id <$> (readTVarIO (queues' st) >>= mapM (readTVarIO . queueRec')) + storeState st = M.mapMaybe id <$> (readTVarIO (queues $ stmQueueStore st) >>= mapM (readTVarIO . queueRec'))