This commit is contained in:
Evgeny Poberezkin
2024-12-15 16:38:09 +00:00
parent a319a377a1
commit 14aed2dda7
10 changed files with 149 additions and 161 deletions
+1 -1
View File
@@ -1765,7 +1765,7 @@ processServerMessages = do
stored'' <- getQueueSize ms q
liftIO $ closeMsgQueue q
pure (stored'', expired'')
processValidateQueue :: JournalStoreType s => JournalQueue s -> IO MessageStats
processValidateQueue :: JournalQueue s -> IO MessageStats
processValidateQueue q =
runExceptT (getQueueSize ms q) >>= \case
Right storedMsgsCount -> pure newMessageStats {storedMsgsCount, storedQueues = 1}
+6 -6
View File
@@ -320,14 +320,14 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, storeLogFile, msgSt
AMS SMSJournal <$> newMsgStore (storeCfg SMSJournal storePath)
(_, Nothing) -> putStrLn "Error: journal msg store requires that restore_messages is enabled in [STORE_LOG]" >> exitFailure
where
storeCfg :: JournalStoreType s => SMSType s -> FilePath -> JournalStoreConfig s
storeCfg :: SMSType s -> FilePath -> JournalStoreConfig s
storeCfg queueStoreType storePath =
JournalStoreConfig {storePath, quota = msgQueueQuota, pathParts = journalMsgStoreDepth, queueStoreType, maxMsgCount = maxJournalMsgCount, maxStateLines = maxJournalStateLines, stateTailSize = defaultStateTailSize, idleInterval = idleQueueInterval}
loadStoreLog :: STMQueueStore s => s -> IO ()
loadStoreLog store = forM_ storeLogFile $ \f -> do
loadStoreLog :: STMStoreClass s => s -> IO ()
loadStoreLog st = forM_ storeLogFile $ \f -> do
logInfo $ "restoring queues from file " <> T.pack f
sl <- readWriteQueueStore f store
setStoreLog store sl
sl <- readWriteQueueStore f st
setStoreLog (stmQueueStore st) sl
getCredentials protocol creds = do
files <- missingCreds
unless (null files) $ do
@@ -371,5 +371,5 @@ newSMPProxyAgent smpAgentCfg random = do
smpAgent <- newSMPClientAgent smpAgentCfg random
pure ProxyAgent {smpAgent}
readWriteQueueStore :: MsgStoreClass s => FilePath -> s -> IO (StoreLog 'WriteMode)
readWriteQueueStore :: STMStoreClass s => FilePath -> s -> IO (StoreLog 'WriteMode)
readWriteQueueStore = readWriteStoreLog readQueueStore writeQueueStore
+2
View File
@@ -108,6 +108,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
("WARNING: message log file " <> storeMsgsFilePath <> " will be imported to journal directory " <> storeMsgsJournalDir)
"Messages not imported"
ms <- newJournalMsgStore
-- TODO [queues] it should not load queues if queues are in journal
readQueueStore storeLogFile ms
msgStats <- importMessages True ms storeMsgsFilePath Nothing -- no expiration
putStrLn "Import completed"
@@ -127,6 +128,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
("WARNING: journal directory " <> storeMsgsJournalDir <> " will be exported to message log file " <> storeMsgsFilePath)
"Journal not exported"
ms <- newJournalMsgStore
-- TODO [queues] it should not load queues if queues are in journal
readQueueStore storeLogFile ms
exportMessages True ms storeMsgsFilePath False
putStrLn "Export completed"
@@ -18,6 +18,7 @@
module Simplex.Messaging.Server.MsgStore.Journal
( JournalMsgStore (queueStore, random),
QueueStore (..),
JournalQueue (queueDirectory),
JournalMsgQueue (state),
JournalStoreConfig (..),
@@ -83,12 +84,7 @@ data JournalMsgStore s = JournalMsgStore
}
data QueueStore (s :: MSType) where
MQStore ::
{ queues :: TMap RecipientId (JournalQueue 'MSHybrid),
senders :: TMap SenderId RecipientId,
notifiers :: TMap NotifierId RecipientId,
storeLog :: TVar (Maybe (StoreLog 'WriteMode))
} -> QueueStore 'MSHybrid
MQStore :: STMQueueStore (JournalQueue 'MSHybrid) -> QueueStore 'MSHybrid
-- maps store cached queues
-- Nothing in map indicates that the queue doesn't exist
JQStore ::
@@ -232,16 +228,8 @@ logFileExt = ".log"
newtype StoreIO (s :: MSType) a = StoreIO {unStoreIO :: IO a}
deriving newtype (Functor, Applicative, Monad)
instance STMQueueStore (JournalMsgStore 'MSHybrid) where
queues' = queues . queueStore
{-# INLINE queues' #-}
senders' = senders . queueStore
{-# INLINE senders' #-}
notifiers' = notifiers . queueStore
{-# INLINE notifiers' #-}
storeLog' = storeLog . queueStore
{-# INLINE storeLog' #-}
setStoreLog st sl = atomically $ writeTVar (storeLog' st) (Just sl)
instance STMStoreClass (JournalMsgStore 'MSHybrid) where
stmQueueStore JournalMsgStore {queueStore = MQStore st} = st
mkQueue st rId qr = do
lock <- atomically $ getMapLock (queueLocks st) rId
makeQueue st lock rId qr
@@ -275,11 +263,7 @@ instance JournalStoreType s => MsgStoreClass (JournalMsgStore s) where
queueLocks :: TMap RecipientId Lock <- TM.emptyIO
case queueStoreType config of
SMSHybrid -> do
queues <- TM.emptyIO
senders <- TM.emptyIO
notifiers <- TM.emptyIO
storeLog <- newTVarIO Nothing
let queueStore = MQStore {queues, senders, notifiers, storeLog}
queueStore <- MQStore <$> newQueueStore
pure JournalMsgStore {config, random, queueLocks, queueStore}
SMSJournal -> do
queues_ <- TM.emptyIO
@@ -288,15 +272,15 @@ instance JournalStoreType s => MsgStoreClass (JournalMsgStore s) where
let queueStore = JQStore {queues_, senders_, notifiers_}
pure JournalMsgStore {config, random, queueLocks, queueStore}
closeMsgStore st = case queueStore st of
MQStore {queues, storeLog} -> do
readTVarIO storeLog >>= mapM_ closeStoreLog
readTVarIO queues >>= mapM_ closeMsgQueue
closeMsgStore ms = case queueStore ms of
MQStore st -> do
readTVarIO (storeLog st) >>= mapM_ closeStoreLog
readTVarIO (queues st) >>= mapM_ closeMsgQueue
JQStore {queues_} ->
readTVarIO queues_ >>= mapM_ (mapM closeMsgQueue)
activeMsgQueues st = case queueStore st of
MQStore {queues} -> queues
activeMsgQueues ms = case queueStore ms of
MQStore st -> queues st
JQStore {} -> undefined -- TODO [queues]
-- This function is a "foldr" that opens and closes all queues, processes them as defined by action and accumulates the result.
@@ -363,10 +347,10 @@ instance JournalStoreType s => MsgStoreClass (JournalMsgStore s) where
{-# INLINE msgQueue_' #-}
queueCounts :: JournalMsgStore s -> IO QueueCounts
queueCounts st = case queueStore st of
MQStore {queues, notifiers} -> do
queueCount <- M.size <$> readTVarIO queues
notifierCount <- M.size <$> readTVarIO notifiers
queueCounts ms = case queueStore ms of
MQStore st -> do
queueCount <- M.size <$> readTVarIO (queues st)
notifierCount <- M.size <$> readTVarIO (notifiers st)
pure QueueCounts {queueCount, notifierCount}
JQStore {queues_, notifiers_} -> do
queueCount <- M.size <$> readTVarIO queues_
@@ -398,7 +382,7 @@ instance JournalStoreType s => MsgStoreClass (JournalMsgStore s) where
getQueue :: DirectParty p => JournalMsgStore s -> SParty p -> QueueId -> IO (Either ErrorType (JournalQueue s))
getQueue st party qId = case queueStore st of
MQStore {} -> getQueue' st party qId
MQStore st' -> getQueue' st' party qId
JQStore {queues_, senders_, notifiers_} ->
isolateQueueId "getQueue" st qId $
maybe (Left AUTH) Right <$> case party of
@@ -414,7 +398,7 @@ instance JournalStoreType s => MsgStoreClass (JournalMsgStore s) where
secureQueue :: JournalMsgStore s -> JournalQueue s -> SndPublicAuthKey -> IO (Either ErrorType ())
secureQueue st sq sKey = case queueStore st of
MQStore {} -> secureQueue' st sq sKey
MQStore st' -> secureQueue' st' sq sKey
JQStore {} ->
isolateQueueRec sq "secureQueue" $ \q -> case senderKey q of
Just k -> pure $ if sKey == k then Right () else Left AUTH
@@ -422,7 +406,7 @@ instance JournalStoreType s => MsgStoreClass (JournalMsgStore s) where
addQueueNotifier :: JournalMsgStore s -> JournalQueue s -> NtfCreds -> IO (Either ErrorType (Maybe NotifierId))
addQueueNotifier st sq ntfCreds@NtfCreds {notifierId = nId} = case queueStore st of
MQStore {} -> addQueueNotifier' st sq ntfCreds
MQStore st' -> addQueueNotifier' st' sq ntfCreds
JQStore {notifiers_} ->
isolateQueueRec sq "addQueueNotifier" $ \q ->
withLockMap (queueLocks st) nId "addQueueNotifierN" $
@@ -440,7 +424,7 @@ instance JournalStoreType s => MsgStoreClass (JournalMsgStore s) where
deleteQueueNotifier :: JournalMsgStore s -> JournalQueue s -> IO (Either ErrorType (Maybe NotifierId))
deleteQueueNotifier st sq = case queueStore st of
MQStore {} -> deleteQueueNotifier' st sq
MQStore st' -> deleteQueueNotifier' st' sq
JQStore {notifiers_} ->
isolateQueueRec sq "deleteQueueNotifier" $ \q ->
fmap Right $ forM (notifier q) $ \NtfCreds {notifierId = nId} ->
@@ -451,14 +435,14 @@ instance JournalStoreType s => MsgStoreClass (JournalMsgStore s) where
suspendQueue :: JournalMsgStore s -> JournalQueue s -> IO (Either ErrorType ())
suspendQueue st sq = case queueStore st of
MQStore {} -> suspendQueue' st sq
MQStore st' -> suspendQueue' st' sq
JQStore {} ->
isolateQueueRec sq "suspendQueue" $ \q ->
fmap Right $ storeQueue sq q {status = QueueOff}
updateQueueTime :: JournalMsgStore s -> JournalQueue s -> RoundedSystemTime -> IO (Either ErrorType QueueRec)
updateQueueTime st sq t = case queueStore st of
MQStore {} -> updateQueueTime' st sq t
MQStore st' -> updateQueueTime' st' sq t
JQStore {} -> isolateQueueRec sq "updateQueueTime" $ fmap Right . update
where
update q@QueueRec {updatedAt}
@@ -915,7 +899,7 @@ deleteQueue_ st sq =
qr = queueRec sq
delete :: IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s)))
delete = case queueStore st of
MQStore {} -> deleteQueue' st sq
MQStore st' -> deleteQueue' st' sq
JQStore {senders_, notifiers_} -> atomically (readQueueRec qr) >>= mapM jqDelete
where
jqDelete q = E.uninterruptibleMask_ $ do
+17 -36
View File
@@ -27,17 +27,11 @@ import Simplex.Messaging.Server.MsgStore.Types
import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.Server.QueueStore.STM
import Simplex.Messaging.Server.StoreLog
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Util ((<$$>), ($>>=))
import System.IO (IOMode (..))
data STMMsgStore = STMMsgStore
{ storeConfig :: STMStoreConfig,
queues :: TMap RecipientId STMQueue,
senders :: TMap SenderId RecipientId,
notifiers :: TMap NotifierId RecipientId,
storeLog :: TVar (Maybe (StoreLog 'WriteMode))
queueStore :: STMQueueStore STMQueue
}
data STMQueue = STMQueue
@@ -59,16 +53,8 @@ data STMStoreConfig = STMStoreConfig
quota :: Int
}
instance STMQueueStore STMMsgStore where
queues' = queues
{-# INLINE queues' #-}
senders' = senders
{-# INLINE senders' #-}
notifiers' = notifiers
{-# INLINE notifiers' #-}
storeLog' = storeLog
{-# INLINE storeLog' #-}
setStoreLog st sl = atomically $ writeTVar (storeLog st) (Just sl)
instance STMStoreClass STMMsgStore where
stmQueueStore = queueStore
mkQueue _ rId qr = STMQueue rId <$> newTVarIO (Just qr) <*> newTVarIO Nothing
instance MsgStoreClass STMMsgStore where
@@ -78,16 +64,11 @@ instance MsgStoreClass STMMsgStore where
type MsgStoreConfig STMMsgStore = STMStoreConfig
newMsgStore :: STMStoreConfig -> IO STMMsgStore
newMsgStore storeConfig = do
queues <- TM.emptyIO
senders <- TM.emptyIO
notifiers <- TM.emptyIO
storeLog <- newTVarIO Nothing
pure STMMsgStore {storeConfig, queues, senders, notifiers, storeLog}
newMsgStore storeConfig = STMMsgStore storeConfig <$> newQueueStore
closeMsgStore st = readTVarIO (storeLog st) >>= mapM_ closeStoreLog
closeMsgStore st = readTVarIO (storeLog $ queueStore st) >>= mapM_ closeStoreLog
activeMsgQueues = queues
activeMsgQueues = queues . queueStore
{-# INLINE activeMsgQueues #-}
withAllMsgQueues _ = withActiveMsgQueues
@@ -107,30 +88,30 @@ instance MsgStoreClass STMMsgStore where
{-# INLINE msgQueue_' #-}
queueCounts :: STMMsgStore -> IO QueueCounts
queueCounts st = do
queueCount <- M.size <$> readTVarIO (queues st)
notifierCount <- M.size <$> readTVarIO (notifiers st)
queueCounts STMMsgStore {queueStore} = do
queueCount <- M.size <$> readTVarIO (queues queueStore)
notifierCount <- M.size <$> readTVarIO (notifiers queueStore)
pure QueueCounts {queueCount, notifierCount}
addQueue = addQueue'
{-# INLINE addQueue #-}
getQueue = getQueue'
getQueue = getQueue' . queueStore
{-# INLINE getQueue #-}
secureQueue = secureQueue'
secureQueue = secureQueue' . queueStore
{-# INLINE secureQueue #-}
addQueueNotifier = addQueueNotifier'
addQueueNotifier = addQueueNotifier' . queueStore
{-# INLINE addQueueNotifier #-}
deleteQueueNotifier = deleteQueueNotifier'
deleteQueueNotifier = deleteQueueNotifier' . queueStore
{-# INLINE deleteQueueNotifier #-}
suspendQueue = suspendQueue'
suspendQueue = suspendQueue' . queueStore
{-# INLINE suspendQueue #-}
updateQueueTime = updateQueueTime'
updateQueueTime = updateQueueTime' . queueStore
{-# INLINE updateQueueTime #-}
getMsgQueue :: STMMsgStore -> STMQueue -> STM STMMsgQueue
@@ -157,10 +138,10 @@ instance MsgStoreClass STMMsgStore where
Nothing -> pure (Nothing, 0)
deleteQueue :: STMMsgStore -> STMQueue -> IO (Either ErrorType QueueRec)
deleteQueue ms q = fst <$$> deleteQueue' ms q
deleteQueue ms q = fst <$$> deleteQueue' (queueStore ms) q
deleteQueueSize :: STMMsgStore -> STMQueue -> IO (Either ErrorType (QueueRec, Int))
deleteQueueSize ms q = deleteQueue' ms q >>= mapM (traverse getSize)
deleteQueueSize ms q = deleteQueue' (queueStore ms) q >>= mapM (traverse getSize)
-- traverse operates on the second tuple element
where
getSize = maybe (pure 0) (\STMMsgQueue {size} -> readTVarIO size)
@@ -32,12 +32,15 @@ import Simplex.Messaging.TMap (TMap)
import Simplex.Messaging.Util ((<$$>), ($>>=))
import System.IO (IOMode (..))
class MsgStoreClass s => STMQueueStore s where
queues' :: s -> TMap RecipientId (StoreQueue s)
senders' :: s -> TMap SenderId RecipientId
notifiers' :: s -> TMap NotifierId RecipientId
storeLog' :: s -> TVar (Maybe (StoreLog 'WriteMode))
setStoreLog :: s -> StoreLog 'WriteMode -> IO ()
data STMQueueStore q = STMQueueStore
{ queues :: TMap RecipientId q,
senders :: TMap SenderId RecipientId,
notifiers :: TMap NotifierId RecipientId,
storeLog :: TVar (Maybe (StoreLog 'WriteMode))
}
class MsgStoreClass s => STMStoreClass s where
stmQueueStore :: s -> STMQueueStore (StoreQueue s)
mkQueue :: s -> RecipientId -> QueueRec -> IO (StoreQueue s)
class Monad (StoreMonad s) => MsgStoreClass s where
+81 -28
View File
@@ -14,7 +14,10 @@
{-# LANGUAGE UndecidableInstances #-}
module Simplex.Messaging.Server.QueueStore.STM
( addQueue',
( STMQueueStore (..),
newQueueStore,
setStoreLog,
addQueue',
getQueue',
secureQueue',
addQueueNotifier',
@@ -31,39 +34,57 @@ where
import qualified Control.Exception as E
import Control.Logger.Simple
import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.Trans.Except
import Data.Bitraversable (bimapM)
import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Lazy.Char8 as LB
import Data.Functor (($>))
import qualified Data.Text as T
import Data.Text.Encoding (decodeLatin1)
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.MsgStore.Types
import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.Server.StoreLog
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Util (anyM, ifM, ($>>=), (<$$))
import Simplex.Messaging.Util (anyM, ifM, tshow, ($>>=), (<$$))
import System.IO
import UnliftIO.STM
addQueue' :: STMQueueStore s => s -> RecipientId -> QueueRec -> IO (Either ErrorType (StoreQueue s))
addQueue' st rId qr@QueueRec {senderId = sId, notifier} =
(mkQueue st rId qr >>= atomically . add)
newQueueStore :: IO (STMQueueStore q)
newQueueStore = do
queues <- TM.emptyIO
senders <- TM.emptyIO
notifiers <- TM.emptyIO
storeLog <- newTVarIO Nothing
pure STMQueueStore {queues, senders, notifiers, storeLog}
setStoreLog :: STMQueueStore q -> StoreLog 'WriteMode -> IO ()
setStoreLog st sl = atomically $ writeTVar (storeLog st) (Just sl)
addQueue' :: STMStoreClass s => s -> RecipientId -> QueueRec -> IO (Either ErrorType (StoreQueue s))
addQueue' ms rId qr@QueueRec {senderId = sId, notifier} =
(mkQueue ms rId qr >>= atomically . add)
$>>= \q -> q <$$ withLog "addQueue" st (\s -> logCreateQueue s rId qr)
where
st = stmQueueStore ms
add q = ifM hasId (pure $ Left DUPLICATE_) $ do
TM.insert rId q $ queues' st
TM.insert sId rId $ senders' st
forM_ notifier $ \NtfCreds {notifierId} -> TM.insert notifierId rId $ notifiers' st
TM.insert rId q $ queues st
TM.insert sId rId $ senders st
forM_ notifier $ \NtfCreds {notifierId} -> TM.insert notifierId rId $ notifiers st
pure $ Right q
hasId = anyM [TM.member rId $ queues' st, TM.member sId $ senders' st, hasNotifier]
hasNotifier = maybe (pure False) (\NtfCreds {notifierId} -> TM.member notifierId (notifiers' st)) notifier
hasId = anyM [TM.member rId $ queues st, TM.member sId $ senders st, hasNotifier]
hasNotifier = maybe (pure False) (\NtfCreds {notifierId} -> TM.member notifierId (notifiers st)) notifier
getQueue' :: (STMQueueStore s, DirectParty p) => s -> SParty p -> QueueId -> IO (Either ErrorType (StoreQueue s))
getQueue' :: DirectParty p => STMQueueStore q -> SParty p -> QueueId -> IO (Either ErrorType q)
getQueue' st party qId =
maybe (Left AUTH) Right <$> case party of
SRecipient -> TM.lookupIO qId $ queues' st
SSender -> TM.lookupIO qId (senders' st) $>>= (`TM.lookupIO` queues' st)
SNotifier -> TM.lookupIO qId (notifiers' st) $>>= (`TM.lookupIO` queues' st)
SRecipient -> TM.lookupIO qId $ queues st
SSender -> TM.lookupIO qId (senders st) $>>= (`TM.lookupIO` queues st)
SNotifier -> TM.lookupIO qId (notifiers st) $>>= (`TM.lookupIO` queues st)
secureQueue' :: STMQueueStore s => s -> StoreQueue s -> SndPublicAuthKey -> IO (Either ErrorType ())
secureQueue' :: MsgStoreClass s => STMQueueStore (StoreQueue s) -> StoreQueue s -> SndPublicAuthKey -> IO (Either ErrorType ())
secureQueue' st sq sKey =
atomically (readQueueRec qr $>>= secure)
$>>= \_ -> withLog "secureQueue" st $ \s -> logSecureQueue s (recipientId' sq) sKey
@@ -75,32 +96,32 @@ secureQueue' st sq sKey =
writeTVar qr $ Just q {senderKey = Just sKey}
pure $ Right ()
addQueueNotifier' :: STMQueueStore s => s -> StoreQueue s -> NtfCreds -> IO (Either ErrorType (Maybe NotifierId))
addQueueNotifier' :: MsgStoreClass s => STMQueueStore (StoreQueue s) -> StoreQueue s -> NtfCreds -> IO (Either ErrorType (Maybe NotifierId))
addQueueNotifier' st sq ntfCreds@NtfCreds {notifierId = nId} =
atomically (readQueueRec qr $>>= add)
$>>= \nId_ -> nId_ <$$ withLog "addQueueNotifier" st (\s -> logAddNotifier s rId ntfCreds)
where
qr = queueRec' sq
rId = recipientId' sq
add q = ifM (TM.member nId (notifiers' st)) (pure $ Left DUPLICATE_) $ do
nId_ <- forM (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId (notifiers' st) $> notifierId
add q = ifM (TM.member nId (notifiers st)) (pure $ Left DUPLICATE_) $ do
nId_ <- forM (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId (notifiers st) $> notifierId
let !q' = q {notifier = Just ntfCreds}
writeTVar qr $ Just q'
TM.insert nId rId $ notifiers' st
TM.insert nId rId $ notifiers st
pure $ Right nId_
deleteQueueNotifier' :: STMQueueStore s => s -> StoreQueue s -> IO (Either ErrorType (Maybe NotifierId))
deleteQueueNotifier' :: MsgStoreClass s => STMQueueStore (StoreQueue s) -> StoreQueue s -> IO (Either ErrorType (Maybe NotifierId))
deleteQueueNotifier' st sq =
atomically (readQueueRec qr >>= mapM delete)
$>>= \nId_ -> nId_ <$$ withLog "deleteQueueNotifier" st (`logDeleteNotifier` recipientId' sq)
where
qr = queueRec' sq
delete q = forM (notifier q) $ \NtfCreds {notifierId} -> do
TM.delete notifierId $ notifiers' st
TM.delete notifierId $ notifiers st
writeTVar qr $! Just q {notifier = Nothing}
pure notifierId
suspendQueue' :: STMQueueStore s => s -> StoreQueue s -> IO (Either ErrorType ())
suspendQueue' :: MsgStoreClass s => STMQueueStore (StoreQueue s) -> StoreQueue s -> IO (Either ErrorType ())
suspendQueue' st sq =
atomically (readQueueRec qr >>= mapM suspend)
$>>= \_ -> withLog "suspendQueue" st (`logSuspendQueue` recipientId' sq)
@@ -108,7 +129,7 @@ suspendQueue' st sq =
qr = queueRec' sq
suspend q = writeTVar qr $! Just q {status = QueueOff}
updateQueueTime' :: STMQueueStore s => s -> StoreQueue s -> RoundedSystemTime -> IO (Either ErrorType QueueRec)
updateQueueTime' :: MsgStoreClass s => STMQueueStore (StoreQueue s) -> StoreQueue s -> RoundedSystemTime -> IO (Either ErrorType QueueRec)
updateQueueTime' st sq t = atomically (readQueueRec qr >>= mapM update) $>>= log'
where
qr = queueRec' sq
@@ -121,7 +142,7 @@ updateQueueTime' st sq t = atomically (readQueueRec qr >>= mapM update) $>>= log
| changed = q <$$ withLog "updateQueueTime" st (\sl -> logUpdateQueueTime sl (recipientId' sq) t)
| otherwise = pure $ Right q
deleteQueue' :: STMQueueStore s => s -> StoreQueue s -> IO (Either ErrorType (QueueRec, Maybe (MsgQueue s)))
deleteQueue' :: MsgStoreClass s => STMQueueStore (StoreQueue s) -> StoreQueue s -> IO (Either ErrorType (QueueRec, Maybe (MsgQueue s)))
deleteQueue' st sq =
atomically (readQueueRec qr >>= mapM delete)
$>>= \q -> withLog "deleteQueue" st (`logDeleteQueue` recipientId' sq)
@@ -130,8 +151,8 @@ deleteQueue' st sq =
qr = queueRec' sq
delete q = do
writeTVar qr Nothing
TM.delete (senderId q) $ senders' st
forM_ (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId $ notifiers' st
TM.delete (senderId q) $ senders st
forM_ (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId $ notifiers st
pure q
readQueueRec :: TVar (Maybe QueueRec) -> STM (Either ErrorType QueueRec)
@@ -148,5 +169,37 @@ withLog' name sl action =
where
err = name <> ", withLog, " <> show e
withLog :: STMQueueStore s => String -> s -> (StoreLog 'WriteMode -> IO ()) -> IO (Either ErrorType ())
withLog name = withLog' name . storeLog'
withLog :: String -> STMQueueStore q -> (StoreLog 'WriteMode -> IO ()) -> IO (Either ErrorType ())
withLog name = withLog' name . storeLog
readQueueStore :: forall s. STMStoreClass s => FilePath -> s -> IO ()
readQueueStore f ms = withFile f ReadMode $ LB.hGetContents >=> mapM_ processLine . LB.lines
where
st = stmQueueStore ms
processLine :: LB.ByteString -> IO ()
processLine s' = either printError procLogRecord (strDecode s)
where
s = LB.toStrict s'
procLogRecord :: StoreLogRecord -> IO ()
procLogRecord = \case
CreateQueue rId q -> addQueue' ms rId q >>= qError rId "CreateQueue"
SecureQueue qId sKey -> withQueue qId "SecureQueue" $ \q -> secureQueue' st q sKey
AddNotifier qId ntfCreds -> withQueue qId "AddNotifier" $ \q -> addQueueNotifier' st q ntfCreds
SuspendQueue qId -> withQueue qId "SuspendQueue" $ suspendQueue' st
DeleteQueue qId -> withQueue qId "DeleteQueue" $ deleteQueue' st
DeleteNotifier qId -> withQueue qId "DeleteNotifier" $ deleteQueueNotifier' st
UpdateTime qId t -> withQueue qId "UpdateTime" $ \q -> updateQueueTime' st q t
printError :: String -> IO ()
printError e = B.putStrLn $ "Error parsing log: " <> B.pack e <> " - " <> s
withQueue :: forall a. RecipientId -> T.Text -> (StoreQueue s -> IO (Either ErrorType a)) -> IO ()
withQueue qId op a = runExceptT go >>= qError qId op
where
go = do
q <- ExceptT $ getQueue' st SRecipient qId
liftIO (readTVarIO $ queueRec' q) >>= \case
Nothing -> logWarn $ logPfx qId op <> "already deleted"
Just _ -> void $ ExceptT $ a q
qError qId op = \case
Left e -> logError $ logPfx qId op <> tshow e
Right _ -> pure ()
logPfx qId op = "STORE: " <> op <> ", stored queue " <> decodeLatin1 (strEncode qId) <> ", "
+4 -39
View File
@@ -26,7 +26,6 @@ module Simplex.Messaging.Server.StoreLog
logUpdateQueueTime,
readWriteStoreLog,
writeQueueStore,
readQueueStore,
)
where
@@ -35,14 +34,10 @@ import Control.Concurrent.STM
import qualified Control.Exception as E
import Control.Logger.Simple
import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.Trans.Except
import qualified Data.Attoparsec.ByteString.Char8 as A
import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Lazy.Char8 as LB
import qualified Data.Map.Strict as M
import qualified Data.Text as T
import Data.Text.Encoding (decodeLatin1)
import Data.Time.Clock (getCurrentTime)
import Data.Time.Format.ISO8601 (iso8601Show)
import GHC.IO (catchAny)
@@ -226,42 +221,12 @@ readWriteStoreLog readStore writeStore f st =
renameFile tempBackup timedBackup
logInfo $ "original state preserved as " <> T.pack timedBackup
writeQueueStore :: MsgStoreClass s => StoreLog 'WriteMode -> s -> IO ()
writeQueueStore s st = readTVarIO (activeMsgQueues st) >>= mapM_ writeQueue . M.assocs
writeQueueStore :: STMStoreClass s => StoreLog 'WriteMode -> s -> IO ()
writeQueueStore s st = readTVarIO qs >>= mapM_ writeQueue . M.assocs
where
qs = queues $ stmQueueStore st
writeQueue (rId, q) =
readTVarIO (queueRec' q) >>= \case
Just q' -> when (active q') $ logCreateQueue s rId q' -- TODO we should log suspended queues when we use them
Nothing -> atomically $ TM.delete rId $ activeMsgQueues st
Nothing -> atomically $ TM.delete rId qs
active QueueRec {status} = status == QueueActive
readQueueStore :: forall s. MsgStoreClass s => FilePath -> s -> IO ()
readQueueStore f st = withFile f ReadMode $ LB.hGetContents >=> mapM_ processLine . LB.lines
where
processLine :: LB.ByteString -> IO ()
processLine s' = either printError procLogRecord (strDecode s)
where
s = LB.toStrict s'
procLogRecord :: StoreLogRecord -> IO ()
procLogRecord = \case
CreateQueue rId q -> addQueue st rId q >>= qError rId "CreateQueue"
SecureQueue qId sKey -> withQueue qId "SecureQueue" $ \q -> secureQueue st q sKey
AddNotifier qId ntfCreds -> withQueue qId "AddNotifier" $ \q -> addQueueNotifier st q ntfCreds
SuspendQueue qId -> withQueue qId "SuspendQueue" $ suspendQueue st
DeleteQueue qId -> withQueue qId "DeleteQueue" $ deleteQueue st
DeleteNotifier qId -> withQueue qId "DeleteNotifier" $ deleteQueueNotifier st
UpdateTime qId t -> withQueue qId "UpdateTime" $ \q -> updateQueueTime st q t
printError :: String -> IO ()
printError e = B.putStrLn $ "Error parsing log: " <> B.pack e <> " - " <> s
withQueue :: forall a. RecipientId -> T.Text -> (StoreQueue s -> IO (Either ErrorType a)) -> IO ()
withQueue qId op a = runExceptT go >>= qError qId op
where
go = do
q <- ExceptT $ getQueue st SRecipient qId
liftIO (readTVarIO $ queueRec' q) >>= \case
Nothing -> logWarn $ logPfx qId op <> "already deleted"
Just _ -> void $ ExceptT $ a q
qError qId op = \case
Left e -> logError $ logPfx qId op <> tshow e
Right _ -> pure ()
logPfx qId op = "STORE: " <> op <> ", stored queue " <> decodeLatin1 (strEncode qId) <> ", "
+6 -6
View File
@@ -57,12 +57,12 @@ msgStoreTests = do
it "should create write file when missing" testWriteFileMissing
it "should create read file when read and write files are missing" testReadAndWriteFilesMissing
where
someMsgStoreTests :: STMQueueStore s => SpecWith s
someMsgStoreTests :: STMStoreClass s => SpecWith s
someMsgStoreTests = do
it "should get queue and store/read messages" testGetQueue
it "should not fail on EOF when changing read journal" testChangeReadJournal
withMsgStore :: STMQueueStore s => MsgStoreConfig s -> (s -> IO ()) -> IO ()
withMsgStore :: STMStoreClass s => MsgStoreConfig s -> (s -> IO ()) -> IO ()
withMsgStore cfg = bracket (newMsgStore cfg) closeMsgStore
testSMTStoreConfig :: STMStoreConfig
@@ -116,7 +116,7 @@ testNewQueueRec g sndSecure = do
}
pure (rId, qr)
testGetQueue :: STMQueueStore s => s -> IO ()
testGetQueue :: STMStoreClass s => s -> IO ()
testGetQueue ms = do
g <- C.newRandom
(rId, qr) <- testNewQueueRec g True
@@ -158,7 +158,7 @@ testGetQueue ms = do
(Nothing, Nothing) <- tryDelPeekMsg ms q mId8
void $ ExceptT $ deleteQueue ms q
testChangeReadJournal :: STMQueueStore s => s -> IO ()
testChangeReadJournal :: STMStoreClass s => s -> IO ()
testChangeReadJournal ms = do
g <- C.newRandom
(rId, qr) <- testNewQueueRec g True
@@ -177,7 +177,7 @@ testChangeReadJournal ms = do
(Msg "message 5", Nothing) <- tryDelPeekMsg ms q mId5
void $ ExceptT $ deleteQueue ms q
testExportImportStore :: JournalStoreType s => JournalMsgStore s -> IO ()
testExportImportStore :: JournalMsgStore 'MSHybrid -> IO ()
testExportImportStore ms = do
g <- C.newRandom
(rId1, qr1) <- testNewQueueRec g True
@@ -225,7 +225,7 @@ testExportImportStore ms = do
exportMessages False stmStore testStoreMsgsFile False
(B.sort <$> B.readFile testStoreMsgsFile `shouldReturn`) =<< (B.sort <$> B.readFile (testStoreMsgsFile2 <> ".bak"))
testQueueState :: JournalStoreType s => JournalMsgStore s -> IO ()
testQueueState :: JournalMsgStore s -> IO ()
testQueueState ms = do
g <- C.newRandom
rId <- EntityId <$> atomically (C.randomBytes 24 g)
+1 -1
View File
@@ -109,4 +109,4 @@ testSMPStoreLog testSuite tests =
([], compacted') <- partitionEithers . map strDecode . B.lines <$> B.readFile testStoreLogFile
compacted' `shouldBe` compacted
storeState :: JournalMsgStore 'MSHybrid -> IO (M.Map RecipientId QueueRec)
storeState st = M.mapMaybe id <$> (readTVarIO (queues' st) >>= mapM (readTVarIO . queueRec'))
storeState st = M.mapMaybe id <$> (readTVarIO (queues $ stmQueueStore st) >>= mapM (readTVarIO . queueRec'))