From 8b656d7dc567aa7648ab1d71437dfd8cdc750381 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Thu, 12 Dec 2024 13:45:18 +0000 Subject: [PATCH] partial implementation --- .../Messaging/Server/MsgStore/Journal.hs | 100 ++++++++++++++---- src/Simplex/Messaging/Server/MsgStore/STM.hs | 2 +- .../Messaging/Server/MsgStore/Types.hs | 2 +- .../Messaging/Server/QueueStore/STM.hs | 9 +- 4 files changed, 87 insertions(+), 26 deletions(-) diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index 78be1980f..05690e567 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -51,7 +51,7 @@ import Data.Functor (($>)) import Data.Int (Int64) import Data.List (intercalate) import qualified Data.Map.Strict as M -import Data.Maybe (catMaybes, fromMaybe, isNothing) +import Data.Maybe (catMaybes, fromMaybe, isJust, isNothing) import qualified Data.Text as T import Data.Time.Clock (getCurrentTime) import Data.Time.Clock.System (SystemTime (..), getSystemTime) @@ -91,11 +91,11 @@ data QueueStore (s :: MSType) where } -> QueueStore 'MSMemory -- maps store cached queues -- Nothing in map indicates that the queue doesn't exist - -- JQStore :: - -- { queues_ :: TMap RecipientId (Maybe (JournalQueue 'MSJournal)), - -- senders_ :: TMap SenderId (Maybe RecipientId), - -- notifiers_ :: TMap NotifierId (Maybe RecipientId) - -- } -> QueueStore 'MSJournal + JQStore :: + { queues_ :: TMap RecipientId (Maybe (JournalQueue 'MSJournal)), + senders_ :: TMap SenderId (Maybe RecipientId), + notifiers_ :: TMap NotifierId (Maybe RecipientId) + } -> QueueStore 'MSJournal data JournalStoreConfig s = JournalStoreConfig { storePath :: FilePath, @@ -224,6 +224,9 @@ instance JournalTypeI t => StrEncoding (JournalState t) where queueLogFileName :: String queueLogFileName = "queue_state" +queueRecFileName :: String +queueRecFileName = "queue_rec" + msgLogFileName :: String msgLogFileName = "messages" @@ -239,12 +242,16 @@ instance STMQueueStore (JournalMsgStore 'MSMemory) where notifiers' = notifiers . queueStore storeLog' = storeLog . queueStore mkQueue st qr = do - lock <- getMapLock (queueLocks st) $ recipientId qr - q <- newTVar $ Just qr - mq <- newTVar Nothing - activeAt <- newTVar 0 - isEmpty <- newTVar Nothing - pure $ JournalQueue lock q mq activeAt isEmpty + lock <- atomically $ getMapLock (queueLocks st) $ recipientId qr + makeQueue lock qr + +makeQueue :: Lock -> QueueRec -> IO (JournalQueue s) +makeQueue lock qr = do + q <- newTVarIO $ Just qr + mq <- newTVarIO Nothing + activeAt <- newTVarIO 0 + isEmpty <- newTVarIO Nothing + pure $ JournalQueue lock q mq activeAt isEmpty instance MsgStoreClass (JournalMsgStore s) where type StoreMonad (JournalMsgStore s) = StoreIO s @@ -264,20 +271,27 @@ instance MsgStoreClass (JournalMsgStore s) where storeLog <- newTVarIO Nothing let queueStore = MQStore {queues, senders, notifiers, storeLog} pure JournalMsgStore {config, random, queueLocks, queueStore} - SMSJournal -> undefined + SMSJournal -> do + queues_ <- TM.emptyIO + senders_ <- TM.emptyIO + notifiers_ <- TM.emptyIO + let queueStore = JQStore {queues_, senders_, notifiers_} + pure JournalMsgStore {config, random, queueLocks, queueStore} setStoreLog :: JournalMsgStore s -> StoreLog 'WriteMode -> IO () setStoreLog st sl = case queueStore st of MQStore {storeLog} -> atomically $ writeTVar storeLog (Just sl) + JQStore {} -> undefined closeMsgStore st = case queueStore st of MQStore {queues, storeLog} -> do readTVarIO storeLog >>= mapM_ closeStoreLog readTVarIO queues >>= mapM_ closeMsgQueue + JQStore {} -> undefined activeMsgQueues st = case queueStore st of MQStore {queues} -> queues - {-# INLINE activeMsgQueues #-} + JQStore {} -> undefined -- This function is a "foldr" that opens and closes all queues, processes them as defined by action and accumulates the result. -- It is used to export storage to a single file and also to expire messages and validate all queues when server is started. @@ -345,38 +359,80 @@ instance MsgStoreClass (JournalMsgStore s) where queueCount <- M.size <$> readTVarIO queues notifierCount <- M.size <$> readTVarIO notifiers pure QueueCounts {queueCount, notifierCount} + JQStore {} -> undefined addQueue :: JournalMsgStore s -> QueueRec -> IO (Either ErrorType (JournalQueue s)) - addQueue st qr = case queueStore st of + addQueue st@JournalMsgStore {queueLocks = ls} qr@QueueRec {recipientId = rId, senderId = sId, notifier} = case queueStore st of MQStore {} -> addQueue' st qr + JQStore {queues_, senders_, notifiers_} -> do + lock <- atomically $ getMapLock ls $ recipientId qr + tryStore "addQueue" rId $ + withLock' lock "addQueue" $ withLockMap ls sId "addQueueS" $ withNotifierLock $ + ifM hasAnyId (pure $ Left DUPLICATE_) $ E.uninterruptibleMask_ $ do + q <- makeQueue lock qr + atomically $ TM.insert rId (Just q) queues_ + atomically $ TM.insert sId (Just rId) senders_ + storeQueue queuePath qr + saveQueueRef sId rId + forM_ notifier $ \NtfCreds {notifierId} -> do + atomically $ TM.insert notifierId (Just rId) notifiers_ + saveQueueRef notifierId rId + pure $ Right q + where + dir = msgQueueDirectory st rId + queuePath = queueRecPath dir $ B.unpack (strEncode rId) + storeQueue _ _ = pure () -- TODO + saveQueueRef _ _ = pure () -- TODO + hasAnyId = foldM (fmap . (||)) False [hasId rId queues_, hasId sId senders_, withNotifier (`hasId` notifiers_), hasDir rId, hasDir sId, withNotifier hasDir] + withNotifier p = maybe (pure False) (\NtfCreds {notifierId} -> p notifierId) notifier + withNotifierLock a = maybe a (\NtfCreds {notifierId} -> withLockMap ls notifierId "addQueueN" a) notifier + hasId :: EntityId -> TMap EntityId (Maybe a) -> IO Bool + hasId qId m = maybe False isJust <$> atomically (TM.lookup qId m) + hasDir qId = doesDirectoryExist $ msgQueueDirectory st qId getQueue :: DirectParty p => JournalMsgStore s -> SParty p -> QueueId -> IO (Either ErrorType (JournalQueue s)) getQueue st party qId = case queueStore st of MQStore {} -> getQueue' st party qId + JQStore {queues_, senders_, notifiers_} -> maybe (Left AUTH) Right <$> case party of + SRecipient -> getQueue_ qId + SSender -> getQueueRef senders_ $>>= getQueue_ + SNotifier -> getQueueRef notifiers_ $>>= getQueue_ + where + getQueue_ rId = TM.lookupIO rId queues_ >>= maybe (loadQueue rId) pure + getQueueRef :: TMap EntityId (Maybe RecipientId) -> IO (Maybe RecipientId) + getQueueRef m = TM.lookupIO qId m >>= maybe (loadQueueRef m) pure + loadQueue _rId = undefined -- TODO load, cache, return queue + loadQueueRef _m = undefined -- TODO load, cache, return queue ID getQueueRec :: DirectParty p => JournalMsgStore s -> SParty p -> QueueId -> IO (Either ErrorType (JournalQueue s, QueueRec)) getQueueRec st party qId = case queueStore st of MQStore {} -> getQueueRec' st party qId + JQStore {} -> undefined secureQueue :: JournalMsgStore s -> JournalQueue s -> SndPublicAuthKey -> IO (Either ErrorType ()) secureQueue st sq sKey = case queueStore st of MQStore {} -> secureQueue' st sq sKey + JQStore {} -> undefined addQueueNotifier :: JournalMsgStore s -> JournalQueue s -> NtfCreds -> IO (Either ErrorType (Maybe NotifierId)) addQueueNotifier st sq ntfCreds = case queueStore st of MQStore {} -> addQueueNotifier' st sq ntfCreds + JQStore {} -> undefined deleteQueueNotifier :: JournalMsgStore s -> JournalQueue s -> IO (Either ErrorType (Maybe NotifierId)) deleteQueueNotifier st sq = case queueStore st of MQStore {} -> deleteQueueNotifier' st sq + JQStore {} -> undefined suspendQueue :: JournalMsgStore s -> JournalQueue s -> IO (Either ErrorType ()) suspendQueue st sq = case queueStore st of MQStore {} -> suspendQueue' st sq + JQStore {} -> undefined updateQueueTime :: JournalMsgStore s -> JournalQueue s -> RoundedSystemTime -> IO (Either ErrorType QueueRec) updateQueueTime st sq t = case queueStore st of MQStore {} -> updateQueueTime' st sq t + JQStore {} -> undefined getMsgQueue :: JournalMsgStore s -> RecipientId -> JournalQueue s -> StoreIO s (JournalMsgQueue s) getMsgQueue ms@JournalMsgStore {random} rId JournalQueue {msgQueue_} = @@ -386,6 +442,8 @@ instance MsgStoreClass (JournalMsgStore s) where let dir = msgQueueDirectory ms rId statePath = msgQueueStatePath dir $ B.unpack (strEncode rId) queue = JMQueue {queueDirectory = dir, statePath} + -- TODO this should account for the possibility that the folder exists, + -- but queue files do not q <- ifM (doesDirectoryExist dir) (openMsgQueue ms queue) (createQ queue) atomically $ writeTVar msgQueue_ $ Just q pure q @@ -547,10 +605,10 @@ updateActiveAt :: JournalQueue s -> IO () updateActiveAt q = atomically . writeTVar (activeAt q) . systemSeconds =<< getSystemTime tryStore' :: String -> RecipientId -> IO a -> ExceptT ErrorType IO a -tryStore' op rId = tryStore op rId . fmap Right +tryStore' op rId = ExceptT . tryStore op rId . fmap Right -tryStore :: forall a. String -> RecipientId -> IO (Either ErrorType a) -> ExceptT ErrorType IO a -tryStore op rId a = ExceptT $ E.mask_ $ E.try a >>= either storeErr pure +tryStore :: forall a. String -> RecipientId -> IO (Either ErrorType a) -> IO (Either ErrorType a) +tryStore op rId a = E.mask_ $ E.try a >>= either storeErr pure where storeErr :: E.SomeException -> IO (Either ErrorType a) storeErr e = @@ -558,7 +616,7 @@ tryStore op rId a = ExceptT $ E.mask_ $ E.try a >>= either storeErr pure in logError ("STORE: " <> T.pack e') $> Left (STORE e') isolateQueueId :: String -> JournalMsgStore s -> RecipientId -> IO (Either ErrorType a) -> ExceptT ErrorType IO a -isolateQueueId op ms rId = tryStore op rId . withLockMap (queueLocks ms) rId op +isolateQueueId op ms rId = ExceptT . tryStore op rId . withLockMap (queueLocks ms) rId op openMsgQueue :: JournalMsgStore s -> JMQueue -> IO (JournalMsgQueue s) openMsgQueue ms q@JMQueue {queueDirectory = dir, statePath} = do @@ -620,6 +678,9 @@ msgQueueDirectory JournalMsgStore {config = JournalStoreConfig {storePath, pathP let (seg, s') = B.splitAt 2 s in seg : splitSegments (n - 1) s' +queueRecPath :: FilePath -> String -> FilePath +queueRecPath dir queueId = dir (queueRecFileName <> "." <> queueId <> logFileExt) + msgQueueStatePath :: FilePath -> String -> FilePath msgQueueStatePath dir queueId = dir (queueLogFileName <> "." <> queueId <> logFileExt) @@ -788,6 +849,7 @@ deleteQueue_ :: forall s. JournalMsgStore s -> RecipientId -> JournalQueue s -> deleteQueue_ ms rId q = runExceptT $ isolateQueueId "deleteQueue_" ms rId $ case queueStore ms of MQStore {} -> deleteQueue' ms rId q >>= mapM remove + JQStore {} -> undefined where remove :: (QueueRec, Maybe (JournalMsgQueue s)) -> IO (QueueRec, Maybe (JournalMsgQueue s)) remove r@(_, mq_) = do diff --git a/src/Simplex/Messaging/Server/MsgStore/STM.hs b/src/Simplex/Messaging/Server/MsgStore/STM.hs index 8a29461b4..0dbe4bc30 100644 --- a/src/Simplex/Messaging/Server/MsgStore/STM.hs +++ b/src/Simplex/Messaging/Server/MsgStore/STM.hs @@ -63,7 +63,7 @@ instance STMQueueStore STMMsgStore where senders' = senders notifiers' = notifiers storeLog' = storeLog - mkQueue _ qr = STMQueue <$> newTVar (Just qr) <*> newTVar Nothing + mkQueue _ qr = STMQueue <$> newTVarIO (Just qr) <*> newTVarIO Nothing instance MsgStoreClass STMMsgStore where type StoreMonad STMMsgStore = STM diff --git a/src/Simplex/Messaging/Server/MsgStore/Types.hs b/src/Simplex/Messaging/Server/MsgStore/Types.hs index f43fad442..f000f2d4f 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Types.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Types.hs @@ -34,7 +34,7 @@ class MsgStoreClass s => STMQueueStore s where senders' :: s -> TMap SenderId RecipientId notifiers' :: s -> TMap NotifierId RecipientId storeLog' :: s -> TVar (Maybe (StoreLog 'WriteMode)) - mkQueue :: s -> QueueRec -> STM (StoreQueue s) + mkQueue :: s -> QueueRec -> IO (StoreQueue s) class Monad (StoreMonad s) => MsgStoreClass s where type StoreMonad s = (m :: Type -> Type) | m -> s diff --git a/src/Simplex/Messaging/Server/QueueStore/STM.hs b/src/Simplex/Messaging/Server/QueueStore/STM.hs index 65dd828a6..d058b2e0f 100644 --- a/src/Simplex/Messaging/Server/QueueStore/STM.hs +++ b/src/Simplex/Messaging/Server/QueueStore/STM.hs @@ -44,17 +44,16 @@ import System.IO import UnliftIO.STM addQueue' :: STMQueueStore s => s -> QueueRec -> IO (Either ErrorType (StoreQueue s)) -addQueue' st qr@QueueRec {recipientId = rId, senderId = sId, notifier}= - atomically add +addQueue' st qr@QueueRec {recipientId = rId, senderId = sId, notifier} = + (mkQueue st qr >>= atomically . add) $>>= \q -> q <$$ withLog "addQueue" st (`logCreateQueue` qr) where - add = ifM hasId (pure $ Left DUPLICATE_) $ do - q <- mkQueue st qr + add q = ifM hasId (pure $ Left DUPLICATE_) $ do TM.insert rId q $ queues' st TM.insert sId rId $ senders' st forM_ notifier $ \NtfCreds {notifierId} -> TM.insert notifierId rId $ notifiers' st pure $ Right q - hasId = or <$> sequence [TM.member rId $ queues' st, TM.member sId $ senders' st, hasNotifier] + hasId = foldM (fmap . (||)) False [TM.member rId $ queues' st, TM.member sId $ senders' st, hasNotifier] hasNotifier = maybe (pure False) (\NtfCreds {notifierId} -> TM.member notifierId (notifiers' st)) notifier getQueue' :: (STMQueueStore s, DirectParty p) => s -> SParty p -> QueueId -> IO (Either ErrorType (StoreQueue s))