From ce24f83b64565a0ef7f397ccd0e135a8a3983198 Mon Sep 17 00:00:00 2001 From: Evgeny Date: Wed, 5 Feb 2025 12:04:27 +0000 Subject: [PATCH] refactor STM queues (#1447) --- src/Simplex/Messaging/Server.hs | 99 ++++++----- src/Simplex/Messaging/Server/Env/STM.hs | 4 +- .../Messaging/Server/MsgStore/Journal.hs | 92 +++++----- src/Simplex/Messaging/Server/MsgStore/STM.hs | 66 ++++--- .../Messaging/Server/MsgStore/Types.hs | 87 +++++----- src/Simplex/Messaging/Server/QueueStore.hs | 3 +- .../Messaging/Server/QueueStore/STM.hs | 129 +++++++------- src/Simplex/Messaging/Server/StoreLog.hs | 27 ++- tests/CoreTests/MsgStoreTests.hs | 161 +++++++++--------- tests/CoreTests/StoreLogTests.hs | 24 +-- 10 files changed, 341 insertions(+), 351 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index a097da37a..4fecc4bef 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -397,8 +397,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT atomicModifyIORef'_ (msgExpired stats) (+ expired) printMessageStats "STORE: messages" msgStats where - expireQueueMsgs now ms old rId q = fmap (fromRight newMessageStats) . runExceptT $ do - (expired_, stored) <- idleDeleteExpiredMsgs now ms rId q old + expireQueueMsgs now ms old q = fmap (fromRight newMessageStats) . runExceptT $ do + (expired_, stored) <- idleDeleteExpiredMsgs now ms q old pure MessageStats {storedMsgsCount = stored, expiredMsgsCount = fromMaybe 0 expired_, storedQueues = 1} expireNtfsThread :: ServerConfig -> M () @@ -429,8 +429,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT ss@ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedAllB, qDeletedNew, qDeletedSecured, qSub, qSubAllB, qSubAuth, qSubDuplicate, qSubProhibited, qSubEnd, qSubEndB, ntfCreated, ntfDeleted, ntfDeletedB, ntfSub, ntfSubB, ntfSubAuth, ntfSubDuplicate, msgSent, msgSentAuth, msgSentQuota, msgSentLarge, msgRecv, msgRecvGet, msgGet, msgGetNoMsg, msgGetAuth, msgGetDuplicate, msgGetProhibited, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount, ntfCount, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv} <- asks serverStats AMS _ st <- asks msgStore - let queues = activeMsgQueues st - notifiers = notifiers' st + let STMQueueStore {queues, notifiers} = stmQueueStore st interval = 1000000 * logInterval forever $ do withFile statsFilePath AppendMode $ \h -> liftIO $ do @@ -581,13 +580,14 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT rtm <- getRealTimeMetrics env T.writeFile metricsFile $ prometheusMetrics sm rtm ts - getServerMetrics :: STMQueueStore s => s -> ServerStats -> IO ServerMetrics + getServerMetrics :: STMStoreClass s => s -> ServerStats -> IO ServerMetrics getServerMetrics st ss = do d <- getServerStatsData ss let ps = periodStatDataCounts $ _activeQueues d psNtf = periodStatDataCounts $ _activeQueuesNtf d - queueCount <- M.size <$> readTVarIO (activeMsgQueues st) - notifierCount <- M.size <$> readTVarIO (notifiers' st) + STMQueueStore {queues, notifiers} = stmQueueStore st + queueCount <- M.size <$> readTVarIO queues + notifierCount <- M.size <$> readTVarIO notifiers pure ServerMetrics {statsData = d, activeQueueCounts = ps, activeNtfCounts = psNtf, queueCount, notifierCount} getRealTimeMetrics :: Env -> IO RealTimeMetrics @@ -675,8 +675,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT CPStats -> withUserRole $ do ss <- unliftIO u $ asks serverStats AMS _ st <- unliftIO u $ asks msgStore - let queues = activeMsgQueues st - notifiers = notifiers' st + let STMQueueStore {queues, notifiers} = stmQueueStore st getStat :: (ServerStats -> IORef a) -> IO a getStat var = readIORef (var ss) putStat :: Show a => String -> (ServerStats -> IORef a) -> IO () @@ -852,8 +851,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT CPDelete sId -> withUserRole $ unliftIO u $ do AMS _ st <- asks msgStore r <- liftIO $ runExceptT $ do - (q, qr) <- ExceptT $ getQueueRec st SSender sId - ExceptT $ deleteQueueSize st (recipientId qr) q + q <- ExceptT $ getQueue st SSender sId + ExceptT $ deleteQueueSize st q case r of Left e -> liftIO $ hPutStrLn h $ "error: " <> show e Right (qr, numDeleted) -> do @@ -916,7 +915,7 @@ runClientTransport h@THandle {params = thParams@THandleParams {thVersion, sessio c <- liftIO $ newClient msType clientId q thVersion sessionId ts runClientThreads msType ms active c clientId `finally` clientDisconnected c where - runClientThreads :: STMQueueStore (MsgStore s) => SMSType s -> MsgStore s -> TVar (IM.IntMap (Maybe AClient)) -> Client (MsgStore s) -> IS.Key -> M () + runClientThreads :: STMStoreClass (MsgStore s) => SMSType s -> MsgStore s -> TVar (IM.IntMap (Maybe AClient)) -> Client (MsgStore s) -> IS.Key -> M () runClientThreads msType ms active c clientId = do atomically $ modifyTVar' active $ IM.insert clientId $ Just (AClient msType c) s <- asks server @@ -972,7 +971,7 @@ cancelSub s = case subThread s of _ -> pure () ProhibitSub -> pure () -receive :: forall c s. (Transport c, STMQueueStore s) => THandleSMP c 'TServer -> s -> Client s -> M () +receive :: forall c s. (Transport c, STMStoreClass s) => THandleSMP c 'TServer -> s -> Client s -> M () receive h@THandle {params = THandleParams {thAuth}} ms Client {rcvQ, sndQ, rcvActiveAt, sessionId} = do labelMyThread . B.unpack $ "client $" <> encode sessionId <> " receive" sa <- asks serverActive @@ -1072,7 +1071,7 @@ data VerificationResult s = VRVerified (Maybe (StoreQueue s, QueueRec)) | VRFail -- - the queue or party key do not exist. -- In all cases, the time of the verification should depend only on the provided authorization type, -- a dummy key is used to run verification in the last two cases, and failure is returned irrespective of the result. -verifyTransmission :: forall s. STMQueueStore s => s -> Maybe (THandleAuth 'TServer, C.CbNonce) -> Maybe TransmissionAuth -> ByteString -> QueueId -> Cmd -> M (VerificationResult s) +verifyTransmission :: forall s. STMStoreClass s => s -> Maybe (THandleAuth 'TServer, C.CbNonce) -> Maybe TransmissionAuth -> ByteString -> QueueId -> Cmd -> M (VerificationResult s) verifyTransmission ms auth_ tAuth authorized queueId cmd = case cmd of Cmd SRecipient (NEW k _ _ _ _) -> pure $ Nothing `verifiedWith` k @@ -1149,7 +1148,7 @@ forkClient Client {endThreads, endThreadSeq} label action = do action `finally` atomically (modifyTVar' endThreads $ IM.delete tId) mkWeakThreadId t >>= atomically . modifyTVar' endThreads . IM.insert tId -client :: forall s. STMQueueStore s => THandleParams SMPVersion 'TServer -> Server -> s -> Client s -> M () +client :: forall s. STMStoreClass s => THandleParams SMPVersion 'TServer -> Server -> s -> Client s -> M () client thParams' Server {subscribedQ, ntfSubscribedQ, subscribers} @@ -1282,10 +1281,9 @@ client updatedAt <- Just <$> liftIO getSystemDate let rcvDhSecret = C.dh' dhKey privDhKey qik (rcvId, sndId) = QIK {rcvId, sndId, rcvPublicDhKey, sndSecure} - qRec (recipientId, senderId) = + qRec senderId = QueueRec - { recipientId, - senderId, + { senderId, recipientKey, rcvDhSecret, senderKey = Nothing, @@ -1297,12 +1295,12 @@ client (corrId,entId,) <$> addQueueRetry 3 qik qRec where addQueueRetry :: - Int -> ((RecipientId, SenderId) -> QueueIdsKeys) -> ((RecipientId, SenderId) -> QueueRec) -> M BrokerMsg + Int -> ((RecipientId, SenderId) -> QueueIdsKeys) -> (SenderId -> QueueRec) -> M BrokerMsg addQueueRetry 0 _ _ = pure $ ERR INTERNAL addQueueRetry n qik qRec = do - ids <- getIds - let qr = qRec ids - liftIO (addQueue ms qr) >>= \case + ids@(rId, sId) <- getIds + let qr = qRec sId + liftIO (addQueue ms rId qr) >>= \case Left DUPLICATE_ -> addQueueRetry (n - 1) qik qRec Left e -> pure $ ERR e Right q -> do @@ -1379,7 +1377,7 @@ client incStat $ qSubDuplicate stats atomically (tryTakeTMVar $ delivered s) >> deliver False s where - rId = recipientId qr + rId = recipientId' q newSub :: M Sub newSub = time "SUB newSub" . atomically $ do writeTQueue subscribedQ (rId, clientId, True) @@ -1390,7 +1388,7 @@ client deliver inc sub = do stats <- asks serverStats fmap (either (\e -> (corrId, rId, ERR e)) id) $ liftIO $ runExceptT $ do - msg_ <- tryPeekMsg ms rId q + msg_ <- tryPeekMsg ms q liftIO $ when (inc && isJust msg_) $ incStat (qSub stats) liftIO $ deliverMessage "SUB" qr rId sub msg_ @@ -1424,7 +1422,7 @@ client getMessage_ s delivered_ = do stats <- asks serverStats fmap (either err id) $ liftIO $ runExceptT $ - tryPeekMsg ms (recipientId qr) q >>= \case + tryPeekMsg ms q >>= \case Just msg -> do let encMsg = encryptMsg qr msg incStat $ (if isJust delivered_ then msgGetDuplicate else msgGet) stats @@ -1471,11 +1469,11 @@ client fmap (either err id) $ liftIO $ runExceptT $ do case st of ProhibitSub -> do - deletedMsg_ <- tryDelMsg ms (recipientId qr) q msgId + deletedMsg_ <- tryDelMsg ms q msgId liftIO $ mapM_ (updateStats stats True) deletedMsg_ pure ok _ -> do - (deletedMsg_, msg_) <- tryDelPeekMsg ms (recipientId qr) q msgId + (deletedMsg_, msg_) <- tryDelPeekMsg ms q msgId liftIO $ mapM_ (updateStats stats False) deletedMsg_ liftIO $ deliverMessage "ACK" qr entId sub msg_ _ -> pure $ err NO_MSG @@ -1529,7 +1527,7 @@ client msg_ <- liftIO $ time "SEND" $ runExceptT $ do expireMessages messageExpiration stats msg <- liftIO $ mkMessage msgId body - writeMsg ms (recipientId qr) q True msg + writeMsg ms q True msg case msg_ of Left e -> pure $ err e Right Nothing -> do @@ -1540,10 +1538,10 @@ client when (notification msgFlags) $ do mapM_ (`enqueueNotification` msg) (notifier qr) incStat $ msgSentNtf stats - liftIO $ updatePeriodStats (activeQueuesNtf stats) (recipientId qr) + liftIO $ updatePeriodStats (activeQueuesNtf stats) (recipientId' q) incStat $ msgSent stats incStat $ msgCount stats - liftIO $ updatePeriodStats (activeQueues stats) (recipientId qr) + liftIO $ updatePeriodStats (activeQueues stats) (recipientId' q) pure ok where mkMessage :: MsgId -> C.MaxLenBS MaxMessageLen -> IO Message @@ -1553,7 +1551,7 @@ client expireMessages :: Maybe ExpirationConfig -> ServerStats -> ExceptT ErrorType IO () expireMessages msgExp stats = do - deleted <- maybe (pure 0) (deleteExpiredMsgs ms (recipientId qr) q <=< liftIO . expireBeforeEpoch) msgExp + deleted <- maybe (pure 0) (deleteExpiredMsgs ms q <=< liftIO . expireBeforeEpoch) msgExp liftIO $ when (deleted > 0) $ atomicModifyIORef'_ (msgExpired stats) (+ deleted) -- The condition for delivery of the message is: @@ -1571,7 +1569,7 @@ client whenM (TM.memberIO rId subscribers) $ atomically deliverToSub >>= mapM_ forkDeliver where - rId = recipientId qr + rId = recipientId' q deliverToSub = -- lookup has ot be in the same transaction, -- so that if subscription ends, it re-evalutates @@ -1715,7 +1713,7 @@ client delQueueAndMsgs :: (StoreQueue s, QueueRec) -> M (Transmission BrokerMsg) delQueueAndMsgs (q, _) = do - liftIO (deleteQueue ms entId q) >>= \case + liftIO (deleteQueue ms q) >>= \case Right qr -> do -- Possibly, the same should be done if the queue is suspended, but currently we do not use it atomically $ do @@ -1735,11 +1733,11 @@ client Left e -> pure $ err e getQueueInfo :: StoreQueue s -> QueueRec -> M BrokerMsg - getQueueInfo q QueueRec {recipientId = rId, senderKey, notifier} = do + getQueueInfo q QueueRec {senderKey, notifier} = do fmap (either ERR id) $ liftIO $ runExceptT $ do qiSub <- liftIO $ TM.lookupIO entId subscriptions >>= mapM mkQSub - qiSize <- getQueueSize ms rId q - qiMsg <- toMsgInfo <$$> tryPeekMsg ms rId q + qiSize <- getQueueSize ms q + qiMsg <- toMsgInfo <$$> tryPeekMsg ms q let info = QueueInfo {qiSnd = isJust senderKey, qiNtf = isJust notifier, qiSub, qiSize, qiMsg} pure $ INFO info where @@ -1809,8 +1807,9 @@ exportMessages tty ms f drainMsgs = do logError $ "error exporting messages: " <> tshow e exitFailure where - saveQueueMsgs h rId q = - runExceptT (getQueueMessages drainMsgs ms rId q) >>= \case + saveQueueMsgs h q = do + let rId = recipientId' q + runExceptT (getQueueMessages drainMsgs ms q) >>= \case Right msgs -> Sum (length msgs) <$ BLD.hPutBuilder h (encodeMessages rId msgs) Left e -> do logError $ "STORE: saveQueueMsgs, error exporting messages from queue " <> decodeLatin1 (strEncode rId) <> ", " <> tshow e @@ -1838,7 +1837,7 @@ processServerMessages = do withAllMsgQueues False ms $ processValidateQueue | otherwise -> logWarn "skipping message expiration" $> Nothing where - processExpireQueue old rId q = + processExpireQueue old q = runExceptT expireQueue >>= \case Right (storedMsgsCount, expiredMsgsCount) -> pure MessageStats {storedMsgsCount, expiredMsgsCount, storedQueues = 1} @@ -1847,20 +1846,20 @@ processServerMessages = do exitFailure where expireQueue = do - expired'' <- deleteExpiredMsgs ms rId q old - stored'' <- getQueueSize ms rId q + expired'' <- deleteExpiredMsgs ms q old + stored'' <- getQueueSize ms q liftIO $ closeMsgQueue q pure (stored'', expired'') - processValidateQueue :: RecipientId -> JournalQueue -> IO MessageStats - processValidateQueue rId q = - runExceptT (getQueueSize ms rId q) >>= \case + processValidateQueue :: JournalQueue -> IO MessageStats + processValidateQueue q = + runExceptT (getQueueSize ms q) >>= \case Right storedMsgsCount -> pure newMessageStats {storedMsgsCount, storedQueues = 1} Left e -> do logError $ "STORE: processValidateQueue, failed opening message queue, " <> tshow e exitFailure -- TODO this function should be called after importing queues from store log -importMessages :: forall s. STMQueueStore s => Bool -> s -> FilePath -> Maybe Int64 -> IO MessageStats +importMessages :: forall s. STMStoreClass s => Bool -> s -> FilePath -> Maybe Int64 -> IO MessageStats importMessages tty ms f old_ = do logInfo $ "restoring messages from file " <> T.pack f LB.readFile f >>= runExceptT . foldM restoreMsg (0, Nothing, (0, 0, M.empty)) . LB.lines >>= \case @@ -1873,7 +1872,7 @@ importMessages tty ms f old_ = do renameFile f $ f <> ".bak" mapM_ setOverQuota_ overQuota logQueueStates ms - storedQueues <- M.size <$> readTVarIO (activeMsgQueues ms) + storedQueues <- M.size <$> readTVarIO (queues $ stmQueueStore ms) pure MessageStats {storedMsgsCount, expiredMsgsCount, storedQueues} where progress i = "Processed " <> show i <> " lines" @@ -1892,7 +1891,7 @@ importMessages tty ms f old_ = do (i + 1,Just (rId, q),) <$> case msg of Message {msgTs} | maybe True (systemSeconds msgTs >=) old_ -> do - writeMsg ms rId q False msg >>= \case + writeMsg ms q False msg >>= \case Just _ -> pure (stored + 1, expired, overQuota) Nothing -> do logError $ decodeLatin1 $ "message queue " <> strEncode rId <> " is full, message not restored: " <> strEncode (messageId msg) @@ -1901,11 +1900,11 @@ importMessages tty ms f old_ = do MessageQuota {} -> -- queue was over quota at some point, -- it will be set as over quota once fully imported - mergeQuotaMsgs >> writeMsg ms rId q False msg $> (stored, expired, M.insert rId q overQuota) + mergeQuotaMsgs >> writeMsg ms q False msg $> (stored, expired, M.insert rId q overQuota) where -- if the first message in queue head is "quota", remove it. mergeQuotaMsgs = - withPeekMsgQueue ms rId q "mergeQuotaMsgs" $ maybe (pure ()) $ \case + withPeekMsgQueue ms q "mergeQuotaMsgs" $ maybe (pure ()) $ \case (mq, MessageQuota {}) -> tryDeleteMsg_ q mq False _ -> pure () msgErr :: Show e => String -> e -> String @@ -1982,7 +1981,7 @@ restoreServerStats msgStats_ ntfStats = asks (serverStatsBackupFile . config) >> Right d@ServerStatsData {_qCount = statsQCount, _msgCount = statsMsgCount, _ntfCount = statsNtfCount} -> do s <- asks serverStats AMS _ st <- asks msgStore - _qCount <- M.size <$> readTVarIO (activeMsgQueues st) + _qCount <- M.size <$> readTVarIO (queues $ stmQueueStore st) let _msgCount = maybe statsMsgCount storedMsgsCount msgStats_ _ntfCount = storedMsgsCount ntfStats _msgExpired' = _msgExpired d + maybe 0 expiredMsgsCount msgStats_ diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index f7a9cc7e8..4044e0d22 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -189,7 +189,7 @@ type family MsgStore s where MsgStore 'MSMemory = STMMsgStore MsgStore 'MSJournal = JournalMsgStore -data AMsgStore = forall s. (STMQueueStore (MsgStore s), MsgStoreClass (MsgStore s)) => AMS (SMSType s) (MsgStore s) +data AMsgStore = forall s. (STMStoreClass (MsgStore s), MsgStoreClass (MsgStore s)) => AMS (SMSType s) (MsgStore s) data AStoreQueue = forall s. MsgStoreClass (MsgStore s) => ASQ (SMSType s) (StoreQueue (MsgStore s)) @@ -362,5 +362,5 @@ newSMPProxyAgent smpAgentCfg random = do smpAgent <- newSMPClientAgent smpAgentCfg random pure ProxyAgent {smpAgent} -readWriteQueueStore :: STMQueueStore s => FilePath -> s -> IO (StoreLog 'WriteMode) +readWriteQueueStore :: STMStoreClass s => FilePath -> s -> IO (StoreLog 'WriteMode) readWriteQueueStore = readWriteStoreLog readQueueStore writeQueueStore diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index 4e5496f66..0834ad1d9 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -15,7 +15,7 @@ {-# LANGUAGE TupleSections #-} module Simplex.Messaging.Server.MsgStore.Journal - ( JournalMsgStore (queues, senders, notifiers, random), + ( JournalMsgStore (queueStore, random), JournalQueue, JournalMsgQueue (queue, state), JMQueue (queueDirectory, statePath), @@ -77,10 +77,7 @@ data JournalMsgStore = JournalMsgStore { config :: JournalStoreConfig, random :: TVar StdGen, queueLocks :: TMap RecipientId Lock, - queues :: TMap RecipientId JournalQueue, - senders :: TMap SenderId RecipientId, - notifiers :: TMap NotifierId RecipientId, - storeLog :: TVar (Maybe (StoreLog 'WriteMode)) + queueStore :: STMQueueStore JournalQueue } data JournalStoreConfig = JournalStoreConfig @@ -98,7 +95,8 @@ data JournalStoreConfig = JournalStoreConfig } data JournalQueue = JournalQueue - { queueLock :: Lock, + { recipientId :: RecipientId, + queueLock :: Lock, -- To avoid race conditions and errors when restoring queues, -- Nothing is written to TVar when queue is deleted. queueRec :: TVar (Maybe QueueRec), @@ -218,18 +216,15 @@ logFileExt = ".log" newtype StoreIO a = StoreIO {unStoreIO :: IO a} deriving newtype (Functor, Applicative, Monad) -instance STMQueueStore JournalMsgStore where - queues' = queues - senders' = senders - notifiers' = notifiers - storeLog' = storeLog - mkQueue st qr = do - lock <- getMapLock (queueLocks st) $ recipientId qr +instance STMStoreClass JournalMsgStore where + stmQueueStore JournalMsgStore {queueStore} = queueStore + mkQueue st rId qr = do + lock <- getMapLock (queueLocks st) rId q <- newTVar $ Just qr mq <- newTVar Nothing activeAt <- newTVar 0 isEmpty <- newTVar Nothing - pure $ JournalQueue lock q mq activeAt isEmpty + pure $ JournalQueue rId lock q mq activeAt isEmpty msgQueue_' = msgQueue_ instance MsgStoreClass JournalMsgStore where @@ -242,27 +237,21 @@ instance MsgStoreClass JournalMsgStore where newMsgStore config = do random <- newTVarIO =<< newStdGen queueLocks <- TM.emptyIO - queues <- TM.emptyIO - senders <- TM.emptyIO - notifiers <- TM.emptyIO - storeLog <- newTVarIO Nothing - pure JournalMsgStore {config, random, queueLocks, queues, senders, notifiers, storeLog} + queueStore <- newQueueStore + pure JournalMsgStore {config, random, queueLocks, queueStore} setStoreLog :: JournalMsgStore -> StoreLog 'WriteMode -> IO () - setStoreLog st sl = atomically $ writeTVar (storeLog st) (Just sl) + setStoreLog st sl = atomically $ writeTVar (storeLog $ queueStore st) (Just sl) - closeMsgStore st = do + closeMsgStore JournalMsgStore {queueStore = st} = do readTVarIO (storeLog st) >>= mapM_ closeStoreLog readTVarIO (queues st) >>= mapM_ closeMsgQueue - activeMsgQueues = queues - {-# INLINE activeMsgQueues #-} - -- This function is a "foldr" that opens and closes all queues, processes them as defined by action and accumulates the result. -- It is used to export storage to a single file and also to expire messages and validate all queues when server is started. -- TODO this function requires case-sensitive file system, because it uses queue directory as recipient ID. -- It can be made to support case-insensite FS by supporting more than one queue per directory, by getting recipient ID from state file name. - withAllMsgQueues :: forall a. Monoid a => Bool -> JournalMsgStore -> (RecipientId -> JournalQueue -> IO a) -> IO a + withAllMsgQueues :: forall a. Monoid a => Bool -> JournalMsgStore -> (JournalQueue -> IO a) -> IO a withAllMsgQueues tty ms@JournalMsgStore {config} action = ifM (doesDirectoryExist storePath) processStore (pure mempty) where processStore = do @@ -276,7 +265,7 @@ instance MsgStoreClass JournalMsgStore where r' <- case strDecode $ B.pack queueId of Right rId -> getQueue ms SRecipient rId >>= \case - Right q -> unStoreIO (getMsgQueue ms rId q) *> action rId q <* closeMsgQueue q + Right q -> unStoreIO (getMsgQueue ms q) *> action q <* closeMsgQueue q Left AUTH -> do logWarn $ "STORE: processQueue, queue " <> T.pack queueId <> " was removed, removing " <> T.pack dir removeQueueDirectory_ dir @@ -303,7 +292,7 @@ instance MsgStoreClass JournalMsgStore where (Nothing <$ putStrLn ("Error: path " <> path' <> " is not a directory, skipping")) logQueueStates :: JournalMsgStore -> IO () - logQueueStates ms = withActiveMsgQueues ms $ \_ -> unStoreIO . logQueueState + logQueueStates ms = withActiveMsgQueues ms $ unStoreIO . logQueueState logQueueState :: JournalQueue -> StoreIO () logQueueState q = @@ -312,11 +301,14 @@ instance MsgStoreClass JournalMsgStore where $>>= \mq -> readTVarIO (handles mq) $>>= (\hs -> (readTVarIO (state mq) >>= appendState (stateHandle hs)) $> Just ()) + recipientId' = recipientId + {-# INLINE recipientId' #-} + queueRec' = queueRec {-# INLINE queueRec' #-} - getMsgQueue :: JournalMsgStore -> RecipientId -> JournalQueue -> StoreIO JournalMsgQueue - getMsgQueue ms@JournalMsgStore {random} rId JournalQueue {msgQueue_} = + getMsgQueue :: JournalMsgStore -> JournalQueue -> StoreIO JournalMsgQueue + getMsgQueue ms@JournalMsgStore {random} JournalQueue {recipientId = rId, msgQueue_} = StoreIO $ readTVarIO msgQueue_ >>= maybe newQ pure where newQ = do @@ -334,8 +326,8 @@ instance MsgStoreClass JournalMsgStore where journalId <- newJournalId random mkJournalQueue queue (newMsgQueueState journalId) Nothing - getPeekMsgQueue :: JournalMsgStore -> RecipientId -> JournalQueue -> StoreIO (Maybe (JournalMsgQueue, Message)) - getPeekMsgQueue ms rId q@JournalQueue {isEmpty} = + getPeekMsgQueue :: JournalMsgStore -> JournalQueue -> StoreIO (Maybe (JournalMsgQueue, Message)) + getPeekMsgQueue ms q@JournalQueue {isEmpty} = StoreIO (readTVarIO isEmpty) >>= \case Just True -> pure Nothing Just False -> peek @@ -350,16 +342,16 @@ instance MsgStoreClass JournalMsgStore where pure r where peek = do - mq <- getMsgQueue ms rId q + mq <- getMsgQueue ms q (mq,) <$$> tryPeekMsg_ q mq -- only runs action if queue is not empty - withIdleMsgQueue :: Int64 -> JournalMsgStore -> RecipientId -> JournalQueue -> (JournalMsgQueue -> StoreIO a) -> StoreIO (Maybe a, Int) - withIdleMsgQueue now ms@JournalMsgStore {config} rId q action = + withIdleMsgQueue :: Int64 -> JournalMsgStore -> JournalQueue -> (JournalMsgQueue -> StoreIO a) -> StoreIO (Maybe a, Int) + withIdleMsgQueue now ms@JournalMsgStore {config} q action = StoreIO $ readTVarIO (msgQueue_ q) >>= \case Nothing -> E.bracket - (unStoreIO $ getPeekMsgQueue ms rId q) + (unStoreIO $ getPeekMsgQueue ms q) (mapM_ $ \_ -> closeMsgQueue q) (maybe (pure (Nothing, 0)) (unStoreIO . run)) where @@ -375,13 +367,12 @@ instance MsgStoreClass JournalMsgStore where sz <- unStoreIO $ getQueueSize_ mq pure (r, sz) - deleteQueue :: JournalMsgStore -> RecipientId -> JournalQueue -> IO (Either ErrorType QueueRec) - deleteQueue ms rId q = - fst <$$> deleteQueue_ ms rId q + deleteQueue :: JournalMsgStore -> JournalQueue -> IO (Either ErrorType QueueRec) + deleteQueue ms q = fst <$$> deleteQueue_ ms q - deleteQueueSize :: JournalMsgStore -> RecipientId -> JournalQueue -> IO (Either ErrorType (QueueRec, Int)) - deleteQueueSize ms rId q = - deleteQueue_ ms rId q >>= mapM (traverse getSize) + deleteQueueSize :: JournalMsgStore -> JournalQueue -> IO (Either ErrorType (QueueRec, Int)) + deleteQueueSize ms q = + deleteQueue_ ms q >>= mapM (traverse getSize) -- traverse operates on the second tuple element where getSize = maybe (pure (-1)) (fmap size . readTVarIO . state) @@ -397,9 +388,9 @@ instance MsgStoreClass JournalMsgStore where updateReadPos q drainMsgs len hs (msg :) <$> run msgs - writeMsg :: JournalMsgStore -> RecipientId -> JournalQueue -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool)) - writeMsg ms rId q' logState msg = isolateQueue rId q' "writeMsg" $ do - q <- getMsgQueue ms rId q' + writeMsg :: JournalMsgStore -> JournalQueue -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool)) + writeMsg ms q' logState msg = isolateQueue q' "writeMsg" $ do + q <- getMsgQueue ms q' StoreIO $ (`E.finally` updateActiveAt q') $ do st@MsgQueueState {canWrite, size} <- readTVarIO (state q) let empty = size == 0 @@ -476,9 +467,9 @@ instance MsgStoreClass JournalMsgStore where $>>= \len -> readTVarIO handles $>>= \hs -> updateReadPos mq logState len hs $> Just () - isolateQueue :: RecipientId -> JournalQueue -> String -> StoreIO a -> ExceptT ErrorType IO a - isolateQueue rId JournalQueue {queueLock} op = - tryStore' op rId . withLock' queueLock op . unStoreIO + isolateQueue :: JournalQueue -> String -> StoreIO a -> ExceptT ErrorType IO a + isolateQueue JournalQueue {recipientId, queueLock} op = + tryStore' op recipientId . withLock' queueLock op . unStoreIO updateActiveAt :: JournalQueue -> IO () updateActiveAt q = atomically . writeTVar (activeAt q) . systemSeconds =<< getSystemTime @@ -721,11 +712,12 @@ validQueueState MsgQueueState {readState = rs, writeState = ws, size} && msgPos ws == msgCount ws && bytePos ws == byteCount ws -deleteQueue_ :: JournalMsgStore -> RecipientId -> JournalQueue -> IO (Either ErrorType (QueueRec, Maybe JournalMsgQueue)) -deleteQueue_ ms rId q = +deleteQueue_ :: JournalMsgStore -> JournalQueue -> IO (Either ErrorType (QueueRec, Maybe JournalMsgQueue)) +deleteQueue_ ms q = runExceptT $ isolateQueueId "deleteQueue_" ms rId $ - deleteQueue' ms rId q >>= mapM remove + deleteQueue' ms q >>= mapM remove where + rId = recipientId q remove r@(_, mq_) = do mapM_ closeMsgQueueHandles mq_ removeQueueDirectory ms rId diff --git a/src/Simplex/Messaging/Server/MsgStore/STM.hs b/src/Simplex/Messaging/Server/MsgStore/STM.hs index cbeb75f9c..2df41a9f1 100644 --- a/src/Simplex/Messaging/Server/MsgStore/STM.hs +++ b/src/Simplex/Messaging/Server/MsgStore/STM.hs @@ -26,22 +26,18 @@ import Simplex.Messaging.Server.MsgStore.Types import Simplex.Messaging.Server.QueueStore import Simplex.Messaging.Server.QueueStore.STM import Simplex.Messaging.Server.StoreLog -import Simplex.Messaging.TMap (TMap) -import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Util ((<$$>), ($>>=)) import System.IO (IOMode (..)) data STMMsgStore = STMMsgStore { storeConfig :: STMStoreConfig, - queues :: TMap RecipientId STMQueue, - senders :: TMap SenderId RecipientId, - notifiers :: TMap NotifierId RecipientId, - storeLog :: TVar (Maybe (StoreLog 'WriteMode)) + queueStore :: STMQueueStore STMQueue } data STMQueue = STMQueue { -- To avoid race conditions and errors when restoring queues, -- Nothing is written to TVar when queue is deleted. + recipientId :: RecipientId, queueRec :: TVar (Maybe QueueRec), msgQueue_ :: TVar (Maybe STMMsgQueue) } @@ -57,12 +53,9 @@ data STMStoreConfig = STMStoreConfig quota :: Int } -instance STMQueueStore STMMsgStore where - queues' = queues - senders' = senders - notifiers' = notifiers - storeLog' = storeLog - mkQueue _ qr = STMQueue <$> newTVar (Just qr) <*> newTVar Nothing +instance STMStoreClass STMMsgStore where + stmQueueStore = queueStore + mkQueue _ rId qr = STMQueue rId <$> newTVar (Just qr) <*> newTVar Nothing msgQueue_' = msgQueue_ instance MsgStoreClass STMMsgStore where @@ -73,32 +66,31 @@ instance MsgStoreClass STMMsgStore where newMsgStore :: STMStoreConfig -> IO STMMsgStore newMsgStore storeConfig = do - queues <- TM.emptyIO - senders <- TM.emptyIO - notifiers <- TM.emptyIO - storeLog <- newTVarIO Nothing - pure STMMsgStore {storeConfig, queues, senders, notifiers, storeLog} + queueStore <- newQueueStore + pure STMMsgStore {storeConfig, queueStore} setStoreLog :: STMMsgStore -> StoreLog 'WriteMode -> IO () - setStoreLog st sl = atomically $ writeTVar (storeLog st) (Just sl) + setStoreLog st sl = atomically $ writeTVar (storeLog $ queueStore st) (Just sl) - closeMsgStore st = readTVarIO (storeLog st) >>= mapM_ closeStoreLog - - activeMsgQueues = queues - {-# INLINE activeMsgQueues #-} + closeMsgStore st = readTVarIO (storeLog $ queueStore st) >>= mapM_ closeStoreLog withAllMsgQueues _ = withActiveMsgQueues {-# INLINE withAllMsgQueues #-} logQueueStates _ = pure () + {-# INLINE logQueueStates #-} logQueueState _ = pure () + {-# INLINE logQueueState #-} + + recipientId' = recipientId + {-# INLINE recipientId' #-} queueRec' = queueRec {-# INLINE queueRec' #-} - getMsgQueue :: STMMsgStore -> RecipientId -> STMQueue -> STM STMMsgQueue - getMsgQueue _ _ STMQueue {msgQueue_} = readTVar msgQueue_ >>= maybe newQ pure + getMsgQueue :: STMMsgStore -> STMQueue -> STM STMMsgQueue + getMsgQueue _ STMQueue {msgQueue_} = readTVar msgQueue_ >>= maybe newQ pure where newQ = do msgQueue <- newTQueue @@ -108,23 +100,23 @@ instance MsgStoreClass STMMsgStore where writeTVar msgQueue_ (Just q) pure q - getPeekMsgQueue :: STMMsgStore -> RecipientId -> STMQueue -> STM (Maybe (STMMsgQueue, Message)) - getPeekMsgQueue _ _ q@STMQueue {msgQueue_} = readTVar msgQueue_ $>>= \mq -> (mq,) <$$> tryPeekMsg_ q mq + getPeekMsgQueue :: STMMsgStore -> STMQueue -> STM (Maybe (STMMsgQueue, Message)) + getPeekMsgQueue _ q@STMQueue {msgQueue_} = readTVar msgQueue_ $>>= \mq -> (mq,) <$$> tryPeekMsg_ q mq -- does not create queue if it does not exist, does not delete it if it does (can't just close in-memory queue) - withIdleMsgQueue :: Int64 -> STMMsgStore -> RecipientId -> STMQueue -> (STMMsgQueue -> STM a) -> STM (Maybe a, Int) - withIdleMsgQueue _ _ _ STMQueue {msgQueue_} action = readTVar msgQueue_ >>= \case + withIdleMsgQueue :: Int64 -> STMMsgStore -> STMQueue -> (STMMsgQueue -> STM a) -> STM (Maybe a, Int) + withIdleMsgQueue _ _ STMQueue {msgQueue_} action = readTVar msgQueue_ >>= \case Just q -> do r <- action q sz <- getQueueSize_ q pure (Just r, sz) Nothing -> pure (Nothing, 0) - deleteQueue :: STMMsgStore -> RecipientId -> STMQueue -> IO (Either ErrorType QueueRec) - deleteQueue ms rId q = fst <$$> deleteQueue' ms rId q + deleteQueue :: STMMsgStore -> STMQueue -> IO (Either ErrorType QueueRec) + deleteQueue ms q = fst <$$> deleteQueue' ms q - deleteQueueSize :: STMMsgStore -> RecipientId -> STMQueue -> IO (Either ErrorType (QueueRec, Int)) - deleteQueueSize ms rId q = deleteQueue' ms rId q >>= mapM (traverse getSize) + deleteQueueSize :: STMMsgStore -> STMQueue -> IO (Either ErrorType (QueueRec, Int)) + deleteQueueSize ms q = deleteQueue' ms q >>= mapM (traverse getSize) -- traverse operates on the second tuple element where getSize = maybe (pure 0) (\STMMsgQueue {size} -> readTVarIO size) @@ -137,9 +129,9 @@ instance MsgStoreClass STMMsgStore where mapM_ (writeTQueue q) msgs pure msgs - writeMsg :: STMMsgStore -> RecipientId -> STMQueue -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool)) - writeMsg ms rId q' _logState msg = liftIO $ atomically $ do - STMMsgQueue {msgQueue = q, canWrite, size} <- getMsgQueue ms rId q' + writeMsg :: STMMsgStore -> STMQueue -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool)) + writeMsg ms q' _logState msg = liftIO $ atomically $ do + STMMsgQueue {msgQueue = q, canWrite, size} <- getMsgQueue ms q' canWrt <- readTVar canWrite empty <- isEmptyTQueue q if canWrt || empty @@ -171,5 +163,5 @@ instance MsgStoreClass STMMsgStore where Just _ -> modifyTVar' size (subtract 1) _ -> pure () - isolateQueue :: RecipientId -> STMQueue -> String -> STM a -> ExceptT ErrorType IO a - isolateQueue _ _ _ = liftIO . atomically + isolateQueue :: STMQueue -> String -> STM a -> ExceptT ErrorType IO a + isolateQueue _ _ = liftIO . atomically diff --git a/src/Simplex/Messaging/Server/MsgStore/Types.hs b/src/Simplex/Messaging/Server/MsgStore/Types.hs index 8754767cd..7317a0fab 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Types.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Types.hs @@ -20,7 +20,6 @@ import Control.Monad.Trans.Except import Data.Functor (($>)) import Data.Int (Int64) import Data.Kind -import qualified Data.Map.Strict as M import Data.Time.Clock.System (SystemTime (systemSeconds)) import Simplex.Messaging.Protocol import Simplex.Messaging.Server.QueueStore @@ -29,12 +28,16 @@ import Simplex.Messaging.TMap (TMap) import Simplex.Messaging.Util ((<$$>)) import System.IO (IOMode (..)) -class MsgStoreClass s => STMQueueStore s where - queues' :: s -> TMap RecipientId (StoreQueue s) - senders' :: s -> TMap SenderId RecipientId - notifiers' :: s -> TMap NotifierId RecipientId - storeLog' :: s -> TVar (Maybe (StoreLog 'WriteMode)) - mkQueue :: s -> QueueRec -> STM (StoreQueue s) +data STMQueueStore q = STMQueueStore + { queues :: TMap RecipientId q, + senders :: TMap SenderId RecipientId, + notifiers :: TMap NotifierId RecipientId, + storeLog :: TVar (Maybe (StoreLog 'WriteMode)) + } + +class MsgStoreClass s => STMStoreClass s where + stmQueueStore :: s -> STMQueueStore (StoreQueue s) + mkQueue :: s -> RecipientId -> QueueRec -> STM (StoreQueue s) msgQueue_' :: StoreQueue s -> TVar (Maybe (MsgQueue s)) class Monad (StoreMonad s) => MsgStoreClass s where @@ -45,25 +48,25 @@ class Monad (StoreMonad s) => MsgStoreClass s where newMsgStore :: MsgStoreConfig s -> IO s setStoreLog :: s -> StoreLog 'WriteMode -> IO () closeMsgStore :: s -> IO () - activeMsgQueues :: s -> TMap RecipientId (StoreQueue s) - withAllMsgQueues :: Monoid a => Bool -> s -> (RecipientId -> StoreQueue s -> IO a) -> IO a + withAllMsgQueues :: Monoid a => Bool -> s -> (StoreQueue s -> IO a) -> IO a logQueueStates :: s -> IO () logQueueState :: StoreQueue s -> StoreMonad s () + recipientId' :: StoreQueue s -> RecipientId queueRec' :: StoreQueue s -> TVar (Maybe QueueRec) - getPeekMsgQueue :: s -> RecipientId -> StoreQueue s -> StoreMonad s (Maybe (MsgQueue s, Message)) - getMsgQueue :: s -> RecipientId -> StoreQueue s -> StoreMonad s (MsgQueue s) + getPeekMsgQueue :: s -> StoreQueue s -> StoreMonad s (Maybe (MsgQueue s, Message)) + getMsgQueue :: s -> StoreQueue s -> StoreMonad s (MsgQueue s) -- the journal queue will be closed after action if it was initially closed or idle longer than interval in config - withIdleMsgQueue :: Int64 -> s -> RecipientId -> StoreQueue s -> (MsgQueue s -> StoreMonad s a) -> StoreMonad s (Maybe a, Int) - deleteQueue :: s -> RecipientId -> StoreQueue s -> IO (Either ErrorType QueueRec) - deleteQueueSize :: s -> RecipientId -> StoreQueue s -> IO (Either ErrorType (QueueRec, Int)) + withIdleMsgQueue :: Int64 -> s -> StoreQueue s -> (MsgQueue s -> StoreMonad s a) -> StoreMonad s (Maybe a, Int) + deleteQueue :: s -> StoreQueue s -> IO (Either ErrorType QueueRec) + deleteQueueSize :: s -> StoreQueue s -> IO (Either ErrorType (QueueRec, Int)) getQueueMessages_ :: Bool -> MsgQueue s -> StoreMonad s [Message] - writeMsg :: s -> RecipientId -> StoreQueue s -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool)) + writeMsg :: s -> StoreQueue s -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool)) setOverQuota_ :: StoreQueue s -> IO () -- can ONLY be used while restoring messages, not while server running getQueueSize_ :: MsgQueue s -> StoreMonad s Int tryPeekMsg_ :: StoreQueue s -> MsgQueue s -> StoreMonad s (Maybe Message) tryDeleteMsg_ :: StoreQueue s -> MsgQueue s -> Bool -> StoreMonad s () - isolateQueue :: RecipientId -> StoreQueue s -> String -> StoreMonad s a -> ExceptT ErrorType IO a + isolateQueue :: StoreQueue s -> String -> StoreMonad s a -> ExceptT ErrorType IO a data MSType = MSMemory | MSJournal @@ -73,28 +76,26 @@ data SMSType :: MSType -> Type where data AMSType = forall s. AMSType (SMSType s) -withActiveMsgQueues :: (MsgStoreClass s, Monoid a) => s -> (RecipientId -> StoreQueue s -> IO a) -> IO a -withActiveMsgQueues st f = readTVarIO (activeMsgQueues st) >>= foldM run mempty . M.assocs +withActiveMsgQueues :: (STMStoreClass s, Monoid a) => s -> (StoreQueue s -> IO a) -> IO a +withActiveMsgQueues st f = readTVarIO (queues $ stmQueueStore st) >>= foldM run mempty where - run !acc (k, v) = do - r <- f k v - pure $! acc <> r + run !acc = fmap (acc <>) . f -getQueueMessages :: MsgStoreClass s => Bool -> s -> RecipientId -> StoreQueue s -> ExceptT ErrorType IO [Message] -getQueueMessages drainMsgs st rId q = withPeekMsgQueue st rId q "getQueueSize" $ maybe (pure []) (getQueueMessages_ drainMsgs . fst) +getQueueMessages :: MsgStoreClass s => Bool -> s -> StoreQueue s -> ExceptT ErrorType IO [Message] +getQueueMessages drainMsgs st q = withPeekMsgQueue st q "getQueueSize" $ maybe (pure []) (getQueueMessages_ drainMsgs . fst) {-# INLINE getQueueMessages #-} -getQueueSize :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> ExceptT ErrorType IO Int -getQueueSize st rId q = withPeekMsgQueue st rId q "getQueueSize" $ maybe (pure 0) (getQueueSize_ . fst) +getQueueSize :: MsgStoreClass s => s -> StoreQueue s -> ExceptT ErrorType IO Int +getQueueSize st q = withPeekMsgQueue st q "getQueueSize" $ maybe (pure 0) (getQueueSize_ . fst) {-# INLINE getQueueSize #-} -tryPeekMsg :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> ExceptT ErrorType IO (Maybe Message) -tryPeekMsg st rId q = snd <$$> withPeekMsgQueue st rId q "tryPeekMsg" pure +tryPeekMsg :: MsgStoreClass s => s -> StoreQueue s -> ExceptT ErrorType IO (Maybe Message) +tryPeekMsg st q = snd <$$> withPeekMsgQueue st q "tryPeekMsg" pure {-# INLINE tryPeekMsg #-} -tryDelMsg :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message) -tryDelMsg st rId q msgId' = - withPeekMsgQueue st rId q "tryDelMsg" $ +tryDelMsg :: MsgStoreClass s => s -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message) +tryDelMsg st q msgId' = + withPeekMsgQueue st q "tryDelMsg" $ maybe (pure Nothing) $ \(mq, msg) -> if | messageId msg == msgId' -> @@ -102,30 +103,30 @@ tryDelMsg st rId q msgId' = | otherwise -> pure Nothing -- atomic delete (== read) last and peek next message if available -tryDelPeekMsg :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message, Maybe Message) -tryDelPeekMsg st rId q msgId' = - withPeekMsgQueue st rId q "tryDelPeekMsg" $ +tryDelPeekMsg :: MsgStoreClass s => s -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message, Maybe Message) +tryDelPeekMsg st q msgId' = + withPeekMsgQueue st q "tryDelPeekMsg" $ maybe (pure (Nothing, Nothing)) $ \(mq, msg) -> if | messageId msg == msgId' -> (Just msg,) <$> (tryDeleteMsg_ q mq True >> tryPeekMsg_ q mq) | otherwise -> pure (Nothing, Just msg) -- The action is called with Nothing when it is known that the queue is empty -withPeekMsgQueue :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> String -> (Maybe (MsgQueue s, Message) -> StoreMonad s a) -> ExceptT ErrorType IO a -withPeekMsgQueue st rId q op a = isolateQueue rId q op $ getPeekMsgQueue st rId q >>= a +withPeekMsgQueue :: MsgStoreClass s => s -> StoreQueue s -> String -> (Maybe (MsgQueue s, Message) -> StoreMonad s a) -> ExceptT ErrorType IO a +withPeekMsgQueue st q op a = isolateQueue q op $ getPeekMsgQueue st q >>= a {-# INLINE withPeekMsgQueue #-} -deleteExpiredMsgs :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> Int64 -> ExceptT ErrorType IO Int -deleteExpiredMsgs st rId q old = - isolateQueue rId q "deleteExpiredMsgs" $ - getMsgQueue st rId q >>= deleteExpireMsgs_ old q +deleteExpiredMsgs :: MsgStoreClass s => s -> StoreQueue s -> Int64 -> ExceptT ErrorType IO Int +deleteExpiredMsgs st q old = + isolateQueue q "deleteExpiredMsgs" $ + getMsgQueue st q >>= deleteExpireMsgs_ old q -- closed and idle queues will be closed after expiration -- returns (expired count, queue size after expiration) -idleDeleteExpiredMsgs :: MsgStoreClass s => Int64 -> s -> RecipientId -> StoreQueue s -> Int64 -> ExceptT ErrorType IO (Maybe Int, Int) -idleDeleteExpiredMsgs now st rId q old = - isolateQueue rId q "idleDeleteExpiredMsgs" $ - withIdleMsgQueue now st rId q (deleteExpireMsgs_ old q) +idleDeleteExpiredMsgs :: MsgStoreClass s => Int64 -> s -> StoreQueue s -> Int64 -> ExceptT ErrorType IO (Maybe Int, Int) +idleDeleteExpiredMsgs now st q old = + isolateQueue q "idleDeleteExpiredMsgs" $ + withIdleMsgQueue now st q (deleteExpireMsgs_ old q) deleteExpireMsgs_ :: MsgStoreClass s => Int64 -> StoreQueue s -> MsgQueue s -> StoreMonad s Int deleteExpireMsgs_ old q mq = do diff --git a/src/Simplex/Messaging/Server/QueueStore.hs b/src/Simplex/Messaging/Server/QueueStore.hs index a40875680..af26b91f7 100644 --- a/src/Simplex/Messaging/Server/QueueStore.hs +++ b/src/Simplex/Messaging/Server/QueueStore.hs @@ -16,8 +16,7 @@ import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol data QueueRec = QueueRec - { recipientId :: !RecipientId, - recipientKey :: !RcvPublicAuthKey, + { recipientKey :: !RcvPublicAuthKey, rcvDhSecret :: !RcvDhSecret, senderId :: !SenderId, senderKey :: !(Maybe SndPublicAuthKey), diff --git a/src/Simplex/Messaging/Server/QueueStore/STM.hs b/src/Simplex/Messaging/Server/QueueStore/STM.hs index a073b5500..f1347533a 100644 --- a/src/Simplex/Messaging/Server/QueueStore/STM.hs +++ b/src/Simplex/Messaging/Server/QueueStore/STM.hs @@ -25,6 +25,7 @@ module Simplex.Messaging.Server.QueueStore.STM unblockQueue, updateQueueTime, deleteQueue', + newQueueStore, readQueueStore, withLog', ) @@ -51,99 +52,106 @@ import Simplex.Messaging.Util (ifM, tshow, ($>>=), (<$$)) import System.IO import UnliftIO.STM -addQueue :: STMQueueStore s => s -> QueueRec -> IO (Either ErrorType (StoreQueue s)) -addQueue st qr@QueueRec {recipientId = rId, senderId = sId, notifier}= - atomically add - $>>= \q -> q <$$ withLog "addQueue" st (`logCreateQueue` qr) - where - add = ifM hasId (pure $ Left DUPLICATE_) $ do - q <- mkQueue st qr -- STMQueue lock <$> (newTVar $! Just qr) <*> newTVar Nothing - TM.insert rId q $ queues' st - TM.insert sId rId $ senders' st - forM_ notifier $ \NtfCreds {notifierId} -> TM.insert notifierId rId $ notifiers' st - pure $ Right q - hasId = or <$> sequence [TM.member rId $ queues' st, TM.member sId $ senders' st, hasNotifier] - hasNotifier = maybe (pure False) (\NtfCreds {notifierId} -> TM.member notifierId (notifiers' st)) notifier +newQueueStore :: IO (STMQueueStore q) +newQueueStore = do + queues <- TM.emptyIO + senders <- TM.emptyIO + notifiers <- TM.emptyIO + storeLog <- newTVarIO Nothing + pure STMQueueStore {queues, senders, notifiers, storeLog} -getQueue :: (STMQueueStore s, DirectParty p) => s -> SParty p -> QueueId -> IO (Either ErrorType (StoreQueue s)) +addQueue :: STMStoreClass s => s -> RecipientId -> QueueRec -> IO (Either ErrorType (StoreQueue s)) +addQueue st rId qr@QueueRec {senderId = sId, notifier}= + atomically add + $>>= \q -> q <$$ withLog "addQueue" st (\s -> logCreateQueue s rId qr) + where + STMQueueStore {queues, senders, notifiers} = stmQueueStore st + add = ifM hasId (pure $ Left DUPLICATE_) $ do + q <- mkQueue st rId qr + TM.insert rId q queues + TM.insert sId rId senders + forM_ notifier $ \NtfCreds {notifierId} -> TM.insert notifierId rId notifiers + pure $ Right q + hasId = or <$> sequence [TM.member rId queues, TM.member sId senders, hasNotifier] + hasNotifier = maybe (pure False) (\NtfCreds {notifierId} -> TM.member notifierId notifiers) notifier + +getQueue :: (STMStoreClass s, DirectParty p) => s -> SParty p -> QueueId -> IO (Either ErrorType (StoreQueue s)) getQueue st party qId = maybe (Left AUTH) Right <$> case party of - SRecipient -> TM.lookupIO qId $ queues' st - SSender -> TM.lookupIO qId (senders' st) $>>= (`TM.lookupIO` queues' st) - SNotifier -> TM.lookupIO qId (notifiers' st) $>>= (`TM.lookupIO` queues' st) + SRecipient -> TM.lookupIO qId queues + SSender -> TM.lookupIO qId senders $>>= (`TM.lookupIO` queues) + SNotifier -> TM.lookupIO qId notifiers $>>= (`TM.lookupIO` queues) + where + STMQueueStore {queues, senders, notifiers} = stmQueueStore st -getQueueRec :: (STMQueueStore s, DirectParty p) => s -> SParty p -> QueueId -> IO (Either ErrorType (StoreQueue s, QueueRec)) +getQueueRec :: (STMStoreClass s, DirectParty p) => s -> SParty p -> QueueId -> IO (Either ErrorType (StoreQueue s, QueueRec)) getQueueRec st party qId = getQueue st party qId $>>= (\q -> maybe (Left AUTH) (Right . (q,)) <$> readTVarIO (queueRec' q)) -secureQueue :: STMQueueStore s => s -> StoreQueue s -> SndPublicAuthKey -> IO (Either ErrorType ()) +secureQueue :: STMStoreClass s => s -> StoreQueue s -> SndPublicAuthKey -> IO (Either ErrorType ()) secureQueue st sq sKey = atomically (readQueueRec qr $>>= secure) - $>>= \rId -> withLog "secureQueue" st $ \s -> logSecureQueue s rId sKey + $>>= \_ -> withLog "secureQueue" st $ \s -> logSecureQueue s (recipientId' sq) sKey where qr = queueRec' sq - secure q@QueueRec {recipientId = rId} = case senderKey q of - Just k -> pure $ if sKey == k then Right rId else Left AUTH + secure q = case senderKey q of + Just k -> pure $ if sKey == k then Right () else Left AUTH Nothing -> do writeTVar qr $ Just q {senderKey = Just sKey} - pure $ Right rId + pure $ Right () -addQueueNotifier :: STMQueueStore s => s -> StoreQueue s -> NtfCreds -> IO (Either ErrorType (Maybe NotifierId)) +addQueueNotifier :: STMStoreClass s => s -> StoreQueue s -> NtfCreds -> IO (Either ErrorType (Maybe NotifierId)) addQueueNotifier st sq ntfCreds@NtfCreds {notifierId = nId} = atomically (readQueueRec qr $>>= add) - $>>= \(rId, nId_) -> nId_ <$$ withLog "addQueueNotifier" st (\s -> logAddNotifier s rId ntfCreds) + $>>= \nId_ -> nId_ <$$ withLog "addQueueNotifier" st (\s -> logAddNotifier s rId ntfCreds) where + rId = recipientId' sq qr = queueRec' sq - add q@QueueRec {recipientId = rId} = ifM (TM.member nId (notifiers' st)) (pure $ Left DUPLICATE_) $ do - nId_ <- forM (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId (notifiers' st) $> notifierId + STMQueueStore {notifiers} = stmQueueStore st + add q = ifM (TM.member nId notifiers) (pure $ Left DUPLICATE_) $ do + nId_ <- forM (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId notifiers $> notifierId let !q' = q {notifier = Just ntfCreds} writeTVar qr $ Just q' - TM.insert nId rId $ notifiers' st - pure $ Right (rId, nId_) + TM.insert nId rId notifiers + pure $ Right nId_ -deleteQueueNotifier :: STMQueueStore s => s -> StoreQueue s -> IO (Either ErrorType (Maybe NotifierId)) +deleteQueueNotifier :: STMStoreClass s => s -> StoreQueue s -> IO (Either ErrorType (Maybe NotifierId)) deleteQueueNotifier st sq = atomically (readQueueRec qr >>= mapM delete) - $>>= \(rId, nId_) -> nId_ <$$ withLog "deleteQueueNotifier" st (`logDeleteNotifier` rId) + $>>= \nId_ -> nId_ <$$ withLog "deleteQueueNotifier" st (`logDeleteNotifier` recipientId' sq) where qr = queueRec' sq - delete q = fmap (recipientId q,) $ forM (notifier q) $ \NtfCreds {notifierId} -> do - TM.delete notifierId $ notifiers' st + delete q = forM (notifier q) $ \NtfCreds {notifierId} -> do + TM.delete notifierId $ notifiers $ stmQueueStore st writeTVar qr $! Just q {notifier = Nothing} pure notifierId -suspendQueue :: STMQueueStore s => s -> StoreQueue s -> IO (Either ErrorType ()) +suspendQueue :: STMStoreClass s => s -> StoreQueue s -> IO (Either ErrorType ()) suspendQueue st sq = atomically (readQueueRec qr >>= mapM suspend) - $>>= \rId -> withLog "suspendQueue" st (`logSuspendQueue` rId) + $>>= \_ -> withLog "suspendQueue" st (`logSuspendQueue` recipientId' sq) where qr = queueRec' sq - suspend q = do - writeTVar qr $! Just q {status = EntityOff} - pure $ recipientId q + suspend q = writeTVar qr $! Just q {status = EntityOff} -blockQueue :: STMQueueStore s => s -> StoreQueue s -> BlockingInfo -> IO (Either ErrorType ()) +blockQueue :: STMStoreClass s => s -> StoreQueue s -> BlockingInfo -> IO (Either ErrorType ()) blockQueue st sq info = atomically (readQueueRec qr >>= mapM block) - $>>= \rId -> withLog "blockQueue" st (\sl -> logBlockQueue sl rId info) + $>>= \_ -> withLog "blockQueue" st (\sl -> logBlockQueue sl (recipientId' sq) info) where qr = queueRec' sq - block q = do - writeTVar qr $ Just q {status = EntityBlocked info} - pure $ recipientId q + block q = writeTVar qr $ Just q {status = EntityBlocked info} -unblockQueue :: STMQueueStore s => s -> StoreQueue s -> IO (Either ErrorType ()) +unblockQueue :: STMStoreClass s => s -> StoreQueue s -> IO (Either ErrorType ()) unblockQueue st sq = atomically (readQueueRec qr >>= mapM unblock) - $>>= \rId -> withLog "unblockQueue" st (`logUnblockQueue` rId) + $>>= \_ -> withLog "unblockQueue" st (`logUnblockQueue` recipientId' sq) where qr = queueRec' sq - unblock q = do - writeTVar qr $ Just q {status = EntityActive} - pure $ recipientId q + unblock q = writeTVar qr $ Just q {status = EntityActive} -updateQueueTime :: STMQueueStore s => s -> StoreQueue s -> RoundedSystemTime -> IO (Either ErrorType QueueRec) +updateQueueTime :: STMStoreClass s => s -> StoreQueue s -> RoundedSystemTime -> IO (Either ErrorType QueueRec) updateQueueTime st sq t = atomically (readQueueRec qr >>= mapM update) $>>= log' where qr = queueRec' sq @@ -153,20 +161,21 @@ updateQueueTime st sq t = atomically (readQueueRec qr >>= mapM update) $>>= log' let !q' = q {updatedAt = Just t} in (writeTVar qr $! Just q') $> (q', True) log' (q, changed) - | changed = q <$$ withLog "updateQueueTime" st (\sl -> logUpdateQueueTime sl (recipientId q) t) + | changed = q <$$ withLog "updateQueueTime" st (\sl -> logUpdateQueueTime sl (recipientId' sq) t) | otherwise = pure $ Right q -deleteQueue' :: STMQueueStore s => s -> RecipientId -> StoreQueue s -> IO (Either ErrorType (QueueRec, Maybe (MsgQueue s))) -deleteQueue' st rId sq = +deleteQueue' :: STMStoreClass s => s -> StoreQueue s -> IO (Either ErrorType (QueueRec, Maybe (MsgQueue s))) +deleteQueue' st sq = atomically (readQueueRec qr >>= mapM delete) - $>>= \q -> withLog "deleteQueue" st (`logDeleteQueue` rId) + $>>= \q -> withLog "deleteQueue" st (`logDeleteQueue` recipientId' sq) >>= bimapM pure (\_ -> (q,) <$> atomically (swapTVar (msgQueue_' sq) Nothing)) where qr = queueRec' sq + STMQueueStore {senders, notifiers} = stmQueueStore st delete q = do writeTVar qr Nothing - TM.delete (senderId q) $ senders' st - forM_ (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId $ notifiers' st + TM.delete (senderId q) senders + forM_ (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId notifiers pure q readQueueRec :: TVar (Maybe QueueRec) -> STM (Either ErrorType QueueRec) @@ -183,10 +192,10 @@ withLog' name sl action = where err = name <> ", withLog, " <> show e -withLog :: STMQueueStore s => String -> s -> (StoreLog 'WriteMode -> IO ()) -> IO (Either ErrorType ()) -withLog name = withLog' name . storeLog' +withLog :: STMStoreClass s => String -> s -> (StoreLog 'WriteMode -> IO ()) -> IO (Either ErrorType ()) +withLog name = withLog' name . storeLog . stmQueueStore -readQueueStore :: forall s. STMQueueStore s => FilePath -> s -> IO () +readQueueStore :: forall s. STMStoreClass s => FilePath -> s -> IO () readQueueStore f st = withFile f ReadMode $ LB.hGetContents >=> mapM_ processLine . LB.lines where processLine :: LB.ByteString -> IO () @@ -195,13 +204,13 @@ readQueueStore f st = withFile f ReadMode $ LB.hGetContents >=> mapM_ processLin s = LB.toStrict s' procLogRecord :: StoreLogRecord -> IO () procLogRecord = \case - CreateQueue q -> addQueue st q >>= qError (recipientId q) "CreateQueue" + CreateQueue rId q -> addQueue st rId q >>= qError rId "CreateQueue" SecureQueue qId sKey -> withQueue qId "SecureQueue" $ \q -> secureQueue st q sKey AddNotifier qId ntfCreds -> withQueue qId "AddNotifier" $ \q -> addQueueNotifier st q ntfCreds SuspendQueue qId -> withQueue qId "SuspendQueue" $ suspendQueue st BlockQueue qId info -> withQueue qId "BlockQueue" $ \q -> blockQueue st q info UnblockQueue qId -> withQueue qId "UnblockQueue" $ unblockQueue st - DeleteQueue qId -> withQueue qId "DeleteQueue" $ deleteQueue st qId + DeleteQueue qId -> withQueue qId "DeleteQueue" $ deleteQueue st DeleteNotifier qId -> withQueue qId "DeleteNotifier" $ deleteQueueNotifier st UpdateTime qId t -> withQueue qId "UpdateTime" $ \q -> updateQueueTime st q t printError :: String -> IO () diff --git a/src/Simplex/Messaging/Server/StoreLog.hs b/src/Simplex/Messaging/Server/StoreLog.hs index 8dee31940..e676770f7 100644 --- a/src/Simplex/Messaging/Server/StoreLog.hs +++ b/src/Simplex/Messaging/Server/StoreLog.hs @@ -54,7 +54,7 @@ import System.Directory (doesFileExist, renameFile) import System.IO data StoreLogRecord - = CreateQueue QueueRec + = CreateQueue RecipientId QueueRec | SecureQueue QueueId SndPublicAuthKey | AddNotifier QueueId NtfCreds | SuspendQueue QueueId @@ -77,10 +77,9 @@ data SLRTag | UpdateTime_ instance StrEncoding QueueRec where - strEncode QueueRec {recipientId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status, updatedAt} = + strEncode QueueRec {recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status, updatedAt} = B.unwords - [ "rid=" <> strEncode recipientId, - "rk=" <> strEncode recipientKey, + [ "rk=" <> strEncode recipientKey, "rdh=" <> strEncode rcvDhSecret, "sid=" <> strEncode senderId, "sk=" <> strEncode senderKey @@ -98,7 +97,6 @@ instance StrEncoding QueueRec where _ -> " status=" <> strEncode status strP = do - recipientId <- "rid=" *> strP_ recipientKey <- "rk=" *> strP_ rcvDhSecret <- "rdh=" *> strP_ senderId <- "sid=" *> strP_ @@ -107,7 +105,7 @@ instance StrEncoding QueueRec where notifier <- optional $ " notifier=" *> strP updatedAt <- optional $ " updated_at=" *> strP status <- (" status=" *> strP) <|> pure EntityActive - pure QueueRec {recipientId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status, updatedAt} + pure QueueRec {recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status, updatedAt} instance StrEncoding SLRTag where strEncode = \case @@ -136,7 +134,7 @@ instance StrEncoding SLRTag where instance StrEncoding StoreLogRecord where strEncode = \case - CreateQueue q -> strEncode (CreateQueue_, q) + CreateQueue rId q -> B.unwords [strEncode CreateQueue_, "rid=" <> strEncode rId, strEncode q] SecureQueue rId sKey -> strEncode (SecureQueue_, rId, sKey) AddNotifier rId ntfCreds -> strEncode (AddNotifier_, rId, ntfCreds) SuspendQueue rId -> strEncode (SuspendQueue_, rId) @@ -148,7 +146,7 @@ instance StrEncoding StoreLogRecord where strP = strP_ >>= \case - CreateQueue_ -> CreateQueue <$> strP + CreateQueue_ -> CreateQueue <$> ("rid=" *> strP_) <*> strP SecureQueue_ -> SecureQueue <$> strP_ <*> strP AddNotifier_ -> AddNotifier <$> strP_ <*> strP SuspendQueue_ -> SuspendQueue <$> strP @@ -186,8 +184,8 @@ writeStoreLogRecord (WriteStoreLog _ h) r = E.uninterruptibleMask_ $ do B.hPut h $ strEncode r `B.snoc` '\n' -- hPutStrLn makes write non-atomic for length > 1024 hFlush h -logCreateQueue :: StoreLog 'WriteMode -> QueueRec -> IO () -logCreateQueue s = writeStoreLogRecord s . CreateQueue +logCreateQueue :: StoreLog 'WriteMode -> RecipientId -> QueueRec -> IO () +logCreateQueue s rId q = writeStoreLogRecord s $ CreateQueue rId q logSecureQueue :: StoreLog 'WriteMode -> QueueId -> SndPublicAuthKey -> IO () logSecureQueue s qId sKey = writeStoreLogRecord s $ SecureQueue qId sKey @@ -248,10 +246,11 @@ readWriteStoreLog readStore writeStore f st = renameFile tempBackup timedBackup logInfo $ "original state preserved as " <> T.pack timedBackup -writeQueueStore :: STMQueueStore s => StoreLog 'WriteMode -> s -> IO () -writeQueueStore s st = readTVarIO (activeMsgQueues st) >>= mapM_ writeQueue . M.assocs +writeQueueStore :: STMStoreClass s => StoreLog 'WriteMode -> s -> IO () +writeQueueStore s st = readTVarIO qs >>= mapM_ writeQueue . M.assocs where + qs = queues $ stmQueueStore st writeQueue (rId, q) = readTVarIO (queueRec' q) >>= \case - Just q' -> logCreateQueue s q' - Nothing -> atomically $ TM.delete rId $ activeMsgQueues st + Just q' -> logCreateQueue s rId q' + Nothing -> atomically $ TM.delete rId qs diff --git a/tests/CoreTests/MsgStoreTests.hs b/tests/CoreTests/MsgStoreTests.hs index f9afecf5a..f97930b52 100644 --- a/tests/CoreTests/MsgStoreTests.hs +++ b/tests/CoreTests/MsgStoreTests.hs @@ -58,12 +58,12 @@ msgStoreTests = do it "should create write file when missing" testWriteFileMissing it "should create read file when read and write files are missing" testReadAndWriteFilesMissing where - someMsgStoreTests :: STMQueueStore s => SpecWith s + someMsgStoreTests :: STMStoreClass s => SpecWith s someMsgStoreTests = do it "should get queue and store/read messages" testGetQueue it "should not fail on EOF when changing read journal" testChangeReadJournal -withMsgStore :: STMQueueStore s => MsgStoreConfig s -> (s -> IO ()) -> IO () +withMsgStore :: STMStoreClass s => MsgStoreConfig s -> (s -> IO ()) -> IO () withMsgStore cfg = bracket (newMsgStore cfg) closeMsgStore testSMTStoreConfig :: STMStoreConfig @@ -105,8 +105,7 @@ testNewQueueRec g sndSecure = do (k, pk) <- atomically $ C.generateKeyPair @'C.X25519 g let qr = QueueRec - { recipientId = rId, - recipientKey, + { recipientKey, rcvDhSecret = C.dh' k pk, senderId, senderKey = Nothing, @@ -117,66 +116,66 @@ testNewQueueRec g sndSecure = do } pure (rId, qr) -testGetQueue :: STMQueueStore s => s -> IO () +testGetQueue :: STMStoreClass s => s -> IO () testGetQueue ms = do g <- C.newRandom (rId, qr) <- testNewQueueRec g True runRight_ $ do - q <- ExceptT $ addQueue ms qr - let write s = writeMsg ms rId q True =<< mkMessage s + q <- ExceptT $ addQueue ms rId qr + let write s = writeMsg ms q True =<< mkMessage s Just (Message {msgId = mId1}, True) <- write "message 1" Just (Message {msgId = mId2}, False) <- write "message 2" Just (Message {msgId = mId3}, False) <- write "message 3" - Msg "message 1" <- tryPeekMsg ms rId q - Msg "message 1" <- tryPeekMsg ms rId q - Nothing <- tryDelMsg ms rId q mId2 - Msg "message 1" <- tryDelMsg ms rId q mId1 - Nothing <- tryDelMsg ms rId q mId1 - Msg "message 2" <- tryPeekMsg ms rId q - Nothing <- tryDelMsg ms rId q mId1 - (Nothing, Msg "message 2") <- tryDelPeekMsg ms rId q mId1 - (Msg "message 2", Msg "message 3") <- tryDelPeekMsg ms rId q mId2 - (Nothing, Msg "message 3") <- tryDelPeekMsg ms rId q mId2 - Msg "message 3" <- tryPeekMsg ms rId q - (Msg "message 3", Nothing) <- tryDelPeekMsg ms rId q mId3 - Nothing <- tryDelMsg ms rId q mId2 - Nothing <- tryDelMsg ms rId q mId3 - Nothing <- tryPeekMsg ms rId q + Msg "message 1" <- tryPeekMsg ms q + Msg "message 1" <- tryPeekMsg ms q + Nothing <- tryDelMsg ms q mId2 + Msg "message 1" <- tryDelMsg ms q mId1 + Nothing <- tryDelMsg ms q mId1 + Msg "message 2" <- tryPeekMsg ms q + Nothing <- tryDelMsg ms q mId1 + (Nothing, Msg "message 2") <- tryDelPeekMsg ms q mId1 + (Msg "message 2", Msg "message 3") <- tryDelPeekMsg ms q mId2 + (Nothing, Msg "message 3") <- tryDelPeekMsg ms q mId2 + Msg "message 3" <- tryPeekMsg ms q + (Msg "message 3", Nothing) <- tryDelPeekMsg ms q mId3 + Nothing <- tryDelMsg ms q mId2 + Nothing <- tryDelMsg ms q mId3 + Nothing <- tryPeekMsg ms q Just (Message {msgId = mId4}, True) <- write "message 4" - Msg "message 4" <- tryPeekMsg ms rId q + Msg "message 4" <- tryPeekMsg ms q Just (Message {msgId = mId5}, False) <- write "message 5" - (Nothing, Msg "message 4") <- tryDelPeekMsg ms rId q mId3 - (Msg "message 4", Msg "message 5") <- tryDelPeekMsg ms rId q mId4 + (Nothing, Msg "message 4") <- tryDelPeekMsg ms q mId3 + (Msg "message 4", Msg "message 5") <- tryDelPeekMsg ms q mId4 Just (Message {msgId = mId6}, False) <- write "message 6" Just (Message {msgId = mId7}, False) <- write "message 7" Nothing <- write "message 8" - Msg "message 5" <- tryPeekMsg ms rId q - (Nothing, Msg "message 5") <- tryDelPeekMsg ms rId q mId4 - (Msg "message 5", Msg "message 6") <- tryDelPeekMsg ms rId q mId5 - (Msg "message 6", Msg "message 7") <- tryDelPeekMsg ms rId q mId6 - (Msg "message 7", Just MessageQuota {msgId = mId8}) <- tryDelPeekMsg ms rId q mId7 - (Just MessageQuota {}, Nothing) <- tryDelPeekMsg ms rId q mId8 - (Nothing, Nothing) <- tryDelPeekMsg ms rId q mId8 - void $ ExceptT $ deleteQueue ms rId q + Msg "message 5" <- tryPeekMsg ms q + (Nothing, Msg "message 5") <- tryDelPeekMsg ms q mId4 + (Msg "message 5", Msg "message 6") <- tryDelPeekMsg ms q mId5 + (Msg "message 6", Msg "message 7") <- tryDelPeekMsg ms q mId6 + (Msg "message 7", Just MessageQuota {msgId = mId8}) <- tryDelPeekMsg ms q mId7 + (Just MessageQuota {}, Nothing) <- tryDelPeekMsg ms q mId8 + (Nothing, Nothing) <- tryDelPeekMsg ms q mId8 + void $ ExceptT $ deleteQueue ms q -testChangeReadJournal :: STMQueueStore s => s -> IO () +testChangeReadJournal :: STMStoreClass s => s -> IO () testChangeReadJournal ms = do g <- C.newRandom (rId, qr) <- testNewQueueRec g True runRight_ $ do - q <- ExceptT $ addQueue ms qr - let write s = writeMsg ms rId q True =<< mkMessage s + q <- ExceptT $ addQueue ms rId qr + let write s = writeMsg ms q True =<< mkMessage s Just (Message {msgId = mId1}, True) <- write "message 1" - (Msg "message 1", Nothing) <- tryDelPeekMsg ms rId q mId1 + (Msg "message 1", Nothing) <- tryDelPeekMsg ms q mId1 Just (Message {msgId = mId2}, True) <- write "message 2" - (Msg "message 2", Nothing) <- tryDelPeekMsg ms rId q mId2 + (Msg "message 2", Nothing) <- tryDelPeekMsg ms q mId2 Just (Message {msgId = mId3}, True) <- write "message 3" - (Msg "message 3", Nothing) <- tryDelPeekMsg ms rId q mId3 + (Msg "message 3", Nothing) <- tryDelPeekMsg ms q mId3 Just (Message {msgId = mId4}, True) <- write "message 4" - (Msg "message 4", Nothing) <- tryDelPeekMsg ms rId q mId4 + (Msg "message 4", Nothing) <- tryDelPeekMsg ms q mId4 Just (Message {msgId = mId5}, True) <- write "message 5" - (Msg "message 5", Nothing) <- tryDelPeekMsg ms rId q mId5 - void $ ExceptT $ deleteQueue ms rId q + (Msg "message 5", Nothing) <- tryDelPeekMsg ms q mId5 + void $ ExceptT $ deleteQueue ms q testExportImportStore :: JournalMsgStore -> IO () testExportImportStore ms = do @@ -185,21 +184,21 @@ testExportImportStore ms = do (rId2, qr2) <- testNewQueueRec g True sl <- readWriteQueueStore testStoreLogFile ms runRight_ $ do - let write rId q s = writeMsg ms rId q True =<< mkMessage s - q1 <- ExceptT $ addQueue ms qr1 - liftIO $ logCreateQueue sl qr1 - Just (Message {}, True) <- write rId1 q1 "message 1" - Just (Message {}, False) <- write rId1 q1 "message 2" - q2 <- ExceptT $ addQueue ms qr2 - liftIO $ logCreateQueue sl qr2 - Just (Message {msgId = mId3}, True) <- write rId2 q2 "message 3" - Just (Message {msgId = mId4}, False) <- write rId2 q2 "message 4" - (Msg "message 3", Msg "message 4") <- tryDelPeekMsg ms rId2 q2 mId3 - (Msg "message 4", Nothing) <- tryDelPeekMsg ms rId2 q2 mId4 - Just (Message {}, True) <- write rId2 q2 "message 5" - Just (Message {}, False) <- write rId2 q2 "message 6" - Just (Message {}, False) <- write rId2 q2 "message 7" - Nothing <- write rId2 q2 "message 8" + let write q s = writeMsg ms q True =<< mkMessage s + q1 <- ExceptT $ addQueue ms rId1 qr1 + liftIO $ logCreateQueue sl rId1 qr1 + Just (Message {}, True) <- write q1 "message 1" + Just (Message {}, False) <- write q1 "message 2" + q2 <- ExceptT $ addQueue ms rId2 qr2 + liftIO $ logCreateQueue sl rId2 qr2 + Just (Message {msgId = mId3}, True) <- write q2 "message 3" + Just (Message {msgId = mId4}, False) <- write q2 "message 4" + (Msg "message 3", Msg "message 4") <- tryDelPeekMsg ms q2 mId3 + (Msg "message 4", Nothing) <- tryDelPeekMsg ms q2 mId4 + Just (Message {}, True) <- write q2 "message 5" + Just (Message {}, False) <- write q2 "message 6" + Just (Message {}, False) <- write q2 "message 7" + Nothing <- write q2 "message 8" pure () length <$> listDirectory (msgQueueDirectory ms rId1) `shouldReturn` 2 length <$> listDirectory (msgQueueDirectory ms rId2) `shouldReturn` 3 @@ -297,10 +296,10 @@ testMessageState ms = do (rId, qr) <- testNewQueueRec g True let dir = msgQueueDirectory ms rId statePath = msgQueueStatePath dir $ B.unpack (B64.encode $ unEntityId rId) - write q s = writeMsg ms rId q True =<< mkMessage s + write q s = writeMsg ms q True =<< mkMessage s mId1 <- runRight $ do - q <- ExceptT $ addQueue ms qr + q <- ExceptT $ addQueue ms rId qr Just (Message {msgId = mId1}, True) <- write q "message 1" Just (Message {}, False) <- write q "message 2" liftIO $ closeMsgQueue q @@ -312,19 +311,19 @@ testMessageState ms = do runRight_ $ do q <- ExceptT $ getQueue ms SRecipient rId Just (Message {msgId = mId3}, False) <- write q "message 3" - (Msg "message 1", Msg "message 3") <- tryDelPeekMsg ms rId q mId1 - (Msg "message 3", Nothing) <- tryDelPeekMsg ms rId q mId3 + (Msg "message 1", Msg "message 3") <- tryDelPeekMsg ms q mId1 + (Msg "message 3", Nothing) <- tryDelPeekMsg ms q mId3 liftIO $ closeMsgQueue q testReadFileMissing :: JournalMsgStore -> IO () testReadFileMissing ms = do g <- C.newRandom (rId, qr) <- testNewQueueRec g True - let write q s = writeMsg ms rId q True =<< mkMessage s + let write q s = writeMsg ms q True =<< mkMessage s q <- runRight $ do - q <- ExceptT $ addQueue ms qr + q <- ExceptT $ addQueue ms rId qr Just (Message {}, True) <- write q "message 1" - Msg "message 1" <- tryPeekMsg ms rId q + Msg "message 1" <- tryPeekMsg ms q pure q mq <- fromJust <$> readTVarIO (msgQueue_' q) @@ -335,9 +334,9 @@ testReadFileMissing ms = do runRight_ $ do q' <- ExceptT $ getQueue ms SRecipient rId - Nothing <- tryPeekMsg ms rId q' + Nothing <- tryPeekMsg ms q' Just (Message {}, True) <- write q' "message 2" - Msg "message 2" <- tryPeekMsg ms rId q' + Msg "message 2" <- tryPeekMsg ms q' pure () testReadFileMissingSwitch :: JournalMsgStore -> IO () @@ -354,8 +353,8 @@ testReadFileMissingSwitch ms = do runRight_ $ do q' <- ExceptT $ getQueue ms SRecipient rId - Just (Message {}, False) <- writeMsg ms rId q' True =<< mkMessage "message 6" - Msg "message 5" <- tryPeekMsg ms rId q' + Just (Message {}, False) <- writeMsg ms q' True =<< mkMessage "message 6" + Msg "message 5" <- tryPeekMsg ms q' pure () testWriteFileMissing :: JournalMsgStore -> IO () @@ -373,12 +372,12 @@ testWriteFileMissing ms = do runRight_ $ do q' <- ExceptT $ getQueue ms SRecipient rId - Just Message {msgId = mId3} <- tryPeekMsg ms rId q' - (Msg "message 3", Msg "message 4") <- tryDelPeekMsg ms rId q' mId3 - Just Message {msgId = mId4} <- tryPeekMsg ms rId q' - (Msg "message 4", Nothing) <- tryDelPeekMsg ms rId q' mId4 - Just (Message {}, True) <- writeMsg ms rId q' True =<< mkMessage "message 6" - Msg "message 6" <- tryPeekMsg ms rId q' + Just Message {msgId = mId3} <- tryPeekMsg ms q' + (Msg "message 3", Msg "message 4") <- tryDelPeekMsg ms q' mId3 + Just Message {msgId = mId4} <- tryPeekMsg ms q' + (Msg "message 4", Nothing) <- tryDelPeekMsg ms q' mId4 + Just (Message {}, True) <- writeMsg ms q' True =<< mkMessage "message 6" + Msg "message 6" <- tryPeekMsg ms q' pure () testReadAndWriteFilesMissing :: JournalMsgStore -> IO () @@ -395,20 +394,20 @@ testReadAndWriteFilesMissing ms = do runRight_ $ do q' <- ExceptT $ getQueue ms SRecipient rId - Nothing <- tryPeekMsg ms rId q' - Just (Message {}, True) <- writeMsg ms rId q' True =<< mkMessage "message 6" - Msg "message 6" <- tryPeekMsg ms rId q' + Nothing <- tryPeekMsg ms q' + Just (Message {}, True) <- writeMsg ms q' True =<< mkMessage "message 6" + Msg "message 6" <- tryPeekMsg ms q' pure () writeMessages :: JournalMsgStore -> RecipientId -> QueueRec -> IO JournalQueue writeMessages ms rId qr = runRight $ do - q <- ExceptT $ addQueue ms qr - let write s = writeMsg ms rId q True =<< mkMessage s + q <- ExceptT $ addQueue ms rId qr + let write s = writeMsg ms q True =<< mkMessage s Just (Message {msgId = mId1}, True) <- write "message 1" Just (Message {msgId = mId2}, False) <- write "message 2" Just (Message {}, False) <- write "message 3" - (Msg "message 1", Msg "message 2") <- tryDelPeekMsg ms rId q mId1 - (Msg "message 2", Msg "message 3") <- tryDelPeekMsg ms rId q mId2 + (Msg "message 1", Msg "message 2") <- tryDelPeekMsg ms q mId1 + (Msg "message 2", Msg "message 3") <- tryDelPeekMsg ms q mId2 Just (Message {}, False) <- write "message 4" Just (Message {}, False) <- write "message 5" pure q diff --git a/tests/CoreTests/StoreLogTests.hs b/tests/CoreTests/StoreLogTests.hs index 90bea0192..f62fb808f 100644 --- a/tests/CoreTests/StoreLogTests.hs +++ b/tests/CoreTests/StoreLogTests.hs @@ -60,38 +60,38 @@ storeLogTests = ("SMP server store log, sndSecure = " <> show sndSecure) [ SLTC { name = "create new queue", - saved = [CreateQueue qr], - compacted = [CreateQueue qr], + saved = [CreateQueue rId qr], + compacted = [CreateQueue rId qr], state = M.fromList [(rId, qr)] }, SLTC { name = "secure queue", - saved = [CreateQueue qr, SecureQueue rId testPublicAuthKey], - compacted = [CreateQueue qr {senderKey = Just testPublicAuthKey}], + saved = [CreateQueue rId qr, SecureQueue rId testPublicAuthKey], + compacted = [CreateQueue rId qr {senderKey = Just testPublicAuthKey}], state = M.fromList [(rId, qr {senderKey = Just testPublicAuthKey})] }, SLTC { name = "create and delete queue", - saved = [CreateQueue qr, DeleteQueue rId], + saved = [CreateQueue rId qr, DeleteQueue rId], compacted = [], state = M.fromList [] }, SLTC { name = "create queue and add notifier", - saved = [CreateQueue qr, AddNotifier rId ntfCreds], - compacted = [CreateQueue $ qr {notifier = Just ntfCreds}], + saved = [CreateQueue rId qr, AddNotifier rId ntfCreds], + compacted = [CreateQueue rId qr {notifier = Just ntfCreds}], state = M.fromList [(rId, qr {notifier = Just ntfCreds})] }, SLTC { name = "delete notifier", - saved = [CreateQueue qr, AddNotifier rId ntfCreds, DeleteNotifier rId], - compacted = [CreateQueue qr], + saved = [CreateQueue rId qr, AddNotifier rId ntfCreds, DeleteNotifier rId], + compacted = [CreateQueue rId qr], state = M.fromList [(rId, qr)] }, SLTC { name = "update time", - saved = [CreateQueue qr, UpdateTime rId date], - compacted = [CreateQueue qr {updatedAt = Just date}], + saved = [CreateQueue rId qr, UpdateTime rId date], + compacted = [CreateQueue rId qr {updatedAt = Just date}], state = M.fromList [(rId, qr {updatedAt = Just date})] } ] @@ -112,4 +112,4 @@ testSMPStoreLog testSuite tests = ([], compacted') <- partitionEithers . map strDecode . B.lines <$> B.readFile testStoreLogFile compacted' `shouldBe` compacted storeState :: JournalMsgStore -> IO (M.Map RecipientId QueueRec) - storeState st = M.mapMaybe id <$> (readTVarIO (queues st) >>= mapM (readTVarIO . queueRec')) + storeState st = M.mapMaybe id <$> (readTVarIO (queues $ stmQueueStore st) >>= mapM (readTVarIO . queueRec'))