From d84a49b85a165bdbcd6ae6cca386eb2d01724542 Mon Sep 17 00:00:00 2001 From: Evgeny Date: Mon, 2 Sep 2024 17:06:31 +0100 Subject: [PATCH] smp server: split and reduce STM transactions (#1294) --- src/Simplex/Messaging/Server.hs | 47 +++++++++---------- src/Simplex/Messaging/Server/MsgStore/STM.hs | 45 +++++++++--------- .../Messaging/Server/QueueStore/STM.hs | 34 +++++++------- src/Simplex/Messaging/Transport/HTTP2.hs | 1 - 4 files changed, 62 insertions(+), 65 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index d796d8e6c..e84f26a5a 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -247,7 +247,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do rIds <- M.keysSet <$> readTVarIO ms forM_ rIds $ \rId -> do q <- liftIO $ getMsgQueue ms rId quota - deleted <- atomically $ deleteExpiredMsgs q old + deleted <- liftIO $ deleteExpiredMsgs q old liftIO $ atomicModifyIORef'_ (msgExpired stats) (+ deleted) serverStatsThread_ :: ServerConfig -> [M ()] @@ -623,10 +623,10 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do CPDelete queueId' -> withUserRole $ unliftIO u $ do st <- asks queueStore ms <- asks msgStore - queueId <- atomically (getQueue st SSender queueId') >>= \case + queueId <- liftIO (getQueue st SSender queueId') >>= \case Left _ -> pure queueId' -- fallback to using as recipientId directly Right QueueRec {recipientId} -> pure recipientId - r <- atomically $ + r <- liftIO $ deleteQueue st queueId $>>= \q -> Right . (q,) <$> delMsgQueueSize ms queueId case r of @@ -832,7 +832,7 @@ verifyTransmission auth_ tAuth authorized queueId cmd = get :: DirectParty p => SParty p -> M (Either ErrorType QueueRec) get party = do st <- asks queueStore - atomically $ getQueue st party queueId + liftIO $ getQueue st party queueId verifyCmdAuthorization :: Maybe (THandleAuth 'TServer, C.CbNonce) -> Maybe TransmissionAuth -> ByteString -> C.APublicAuthKey -> Bool verifyCmdAuthorization auth_ tAuth authorized key = maybe False (verify key) tAuth @@ -1039,11 +1039,11 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi ids@(rId, _) <- getIds -- create QueueRec record with these ids and keys let qr = qRec ids - atomically (addQueue st qr) >>= \case + liftIO (addQueue st qr) >>= \case Left DUPLICATE_ -> addQueueRetry (n - 1) qik qRec Left e -> pure $ ERR e - Right _ -> do - withLog (`logCreateById` rId) + Right () -> do + withLog (`logCreateQueue` qr) stats <- asks serverStats incStat $ qCreated stats incStat $ qCount stats @@ -1052,12 +1052,6 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi SMSubscribe -> void $ subscribeQueue qr rId pure $ IDS (qik ids) - logCreateById :: StoreLog 'WriteMode -> RecipientId -> IO () - logCreateById s rId = - atomically (getQueue st SRecipient rId) >>= \case - Right q -> logCreateQueue s q - _ -> pure () - getIds :: M (RecipientId, SenderId) getIds = do n <- asks $ queueIdBytes . config @@ -1069,7 +1063,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi st <- asks queueStore stats <- asks serverStats incStat $ qSecured stats - atomically $ either ERR (const OK) <$> secureQueue st rId sKey + liftIO $ either ERR (const OK) <$> secureQueue st rId sKey addQueueNotifier_ :: QueueStore -> NtfPublicAuthKey -> RcvNtfPublicDhKey -> M (Transmission BrokerMsg) addQueueNotifier_ st notifierKey dhKey = time "NKEY" $ do @@ -1082,7 +1076,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi addNotifierRetry n rcvPublicDhKey rcvNtfDhSecret = do notifierId <- randomId =<< asks (queueIdBytes . config) let ntfCreds = NtfCreds {notifierId, notifierKey, rcvNtfDhSecret} - atomically (addQueueNotifier st entId ntfCreds) >>= \case + liftIO (addQueueNotifier st entId ntfCreds) >>= \case Left DUPLICATE_ -> addNotifierRetry (n - 1) rcvPublicDhKey rcvNtfDhSecret Left e -> pure $ ERR e Right _ -> do @@ -1093,7 +1087,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi deleteQueueNotifier_ :: QueueStore -> M (Transmission BrokerMsg) deleteQueueNotifier_ st = do withLog (`logDeleteNotifier` entId) - atomically (deleteQueueNotifier st entId) >>= \case + liftIO (deleteQueueNotifier st entId) >>= \case Right () -> do -- Possibly, the same should be done if the queue is suspended, but currently we do not use it atomically $ writeTQueue ntfSubscribedQ (entId, clnt, False) @@ -1104,7 +1098,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi suspendQueue_ :: QueueStore -> M (Transmission BrokerMsg) suspendQueue_ st = do withLog (`logSuspendQueue` entId) - okResp <$> atomically (suspendQueue st entId) + okResp <$> liftIO (suspendQueue st entId) subscribeQueue :: QueueRec -> RecipientId -> M (Transmission BrokerMsg) subscribeQueue qr rId = do @@ -1130,7 +1124,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi deliver :: Bool -> Sub -> M (Transmission BrokerMsg) deliver inc sub = do q <- getStoreMsgQueue "SUB" rId - msg_ <- atomically $ tryPeekMsg q + msg_ <- liftIO $ tryPeekMsgIO q when (inc && isJust msg_) $ incStat . qSub =<< asks serverStats deliverMessage "SUB" qr rId sub msg_ @@ -1161,6 +1155,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi q <- getStoreMsgQueue "GET" entId stats <- asks serverStats (statCnt, r) <- + -- TODO split STM, use tryPeekMsgIO atomically $ tryPeekMsg q >>= \case Just msg -> @@ -1199,11 +1194,11 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi q <- getStoreMsgQueue "ACK" entId case st of ProhibitSub -> do - deletedMsg_ <- atomically $ tryDelMsg q msgId + deletedMsg_ <- liftIO $ tryDelMsg q msgId mapM_ (updateStats True) deletedMsg_ pure ok _ -> do - (deletedMsg_, msg_) <- atomically $ tryDelPeekMsg q msgId + (deletedMsg_, msg_) <- liftIO $ tryDelPeekMsg q msgId mapM_ (updateStats False) deletedMsg_ deliverMessage "ACK" qr entId sub msg_ _ -> pure $ err NO_MSG @@ -1246,7 +1241,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi msg_ <- time "SEND" $ do q <- getStoreMsgQueue "SEND" $ recipientId qr expireMessages q - atomically . writeMsg q =<< mkMessage body + liftIO . writeMsg q =<< mkMessage body case msg_ of Nothing -> do incStat $ msgSentQuota stats @@ -1273,7 +1268,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi expireMessages q = do msgExp <- asks $ messageExpiration . config old <- liftIO $ mapM expireBeforeEpoch msgExp - deleted <- atomically $ sum <$> mapM (deleteExpiredMsgs q) old + deleted <- liftIO $ sum <$> mapM (deleteExpiredMsgs q) old when (deleted > 0) $ do stats <- asks serverStats liftIO $ atomicModifyIORef'_ (msgExpired stats) (+ deleted) @@ -1290,6 +1285,8 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi tryDeliverMessage msg = atomically deliverToSub >>= mapM_ forkDeliver where rId = recipientId qr + -- TODO split to multiple STM transactions, move lookups to IO + -- remove tryPeekMsg deliverToSub = TM.lookup rId subscribers $>>= \rc@Client {subscriptions = subs, sndQ = q} -> TM.lookup rId subs @@ -1450,7 +1447,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi delQueueAndMsgs st = do withLog (`logDeleteQueue` entId) ms <- asks msgStore - atomically (deleteQueue st entId $>>= \q -> delMsgQueue ms entId $> Right q) >>= \case + liftIO (deleteQueue st entId $>>= \q -> delMsgQueue ms entId $> Right q) >>= \case Right q -> do -- Possibly, the same should be done if the queue is suspended, but currently we do not use it atomically $ writeTQueue subscribedQ (entId, clnt, False) @@ -1465,7 +1462,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi q <- getStoreMsgQueue "getQueueInfo" entId qiSub <- liftIO $ TM.lookupIO entId subscriptions >>= mapM mkQSub qiSize <- liftIO $ getQueueSize q - qiMsg <- atomically $ toMsgInfo <$$> tryPeekMsg q + qiMsg <- liftIO $ toMsgInfo <$$> tryPeekMsgIO q let info = QueueInfo {qiSnd = isJust senderKey, qiNtf = isJust notifier, qiSub, qiSize, qiMsg} pure (corrId, entId, INFO info) where @@ -1571,7 +1568,7 @@ restoreServerMessages = s = LB.toStrict s' addToMsgQueue rId msg = do q <- liftIO $ getMsgQueue ms rId quota - (isExpired, logFull) <- atomically $ case msg of + (isExpired, logFull) <- liftIO $ case msg of Message {msgTs} | maybe True (systemSeconds msgTs >=) old_ -> (False,) . isNothing <$> writeMsg q msg | otherwise -> pure (True, False) diff --git a/src/Simplex/Messaging/Server/MsgStore/STM.hs b/src/Simplex/Messaging/Server/MsgStore/STM.hs index 04447292a..de994c17a 100644 --- a/src/Simplex/Messaging/Server/MsgStore/STM.hs +++ b/src/Simplex/Messaging/Server/MsgStore/STM.hs @@ -16,7 +16,7 @@ module Simplex.Messaging.Server.MsgStore.STM delMsgQueueSize, writeMsg, tryPeekMsg, - peekMsg, + tryPeekMsgIO, tryDelMsg, tryDelPeekMsg, deleteExpiredMsgs, @@ -61,14 +61,14 @@ getMsgQueue st rId quota = TM.lookupIO rId st >>= maybe (atomically maybeNewQ) p TM.insert rId q st pure q -delMsgQueue :: STMMsgStore -> RecipientId -> STM () -delMsgQueue st rId = TM.delete rId st +delMsgQueue :: STMMsgStore -> RecipientId -> IO () +delMsgQueue st rId = atomically $ TM.delete rId st -delMsgQueueSize :: STMMsgStore -> RecipientId -> STM Int -delMsgQueueSize st rId = TM.lookupDelete rId st >>= maybe (pure 0) (\MsgQueue {size} -> readTVar size) +delMsgQueueSize :: STMMsgStore -> RecipientId -> IO Int +delMsgQueueSize st rId = atomically (TM.lookupDelete rId st) >>= maybe (pure 0) (\MsgQueue {size} -> readTVarIO size) -writeMsg :: MsgQueue -> Message -> STM (Maybe (Message, Bool)) -writeMsg MsgQueue {msgQueue = q, quota, canWrite, size} !msg = do +writeMsg :: MsgQueue -> Message -> IO (Maybe (Message, Bool)) +writeMsg MsgQueue {msgQueue = q, quota, canWrite, size} !msg = atomically $ do canWrt <- readTVar canWrite empty <- isEmptyTQueue q if canWrt || empty @@ -83,43 +83,44 @@ writeMsg MsgQueue {msgQueue = q, quota, canWrite, size} !msg = do where msgQuota = MessageQuota {msgId = msgId msg, msgTs = msgTs msg} +tryPeekMsgIO :: MsgQueue -> IO (Maybe Message) +tryPeekMsgIO = atomically . tryPeekTQueue . msgQueue +{-# INLINE tryPeekMsgIO #-} + +-- TODO remove once deliverToSub is split tryPeekMsg :: MsgQueue -> STM (Maybe Message) tryPeekMsg = tryPeekTQueue . msgQueue {-# INLINE tryPeekMsg #-} -peekMsg :: MsgQueue -> STM Message -peekMsg = peekTQueue . msgQueue -{-# INLINE peekMsg #-} - -tryDelMsg :: MsgQueue -> MsgId -> STM (Maybe Message) -tryDelMsg mq msgId' = +tryDelMsg :: MsgQueue -> MsgId -> IO (Maybe Message) +tryDelMsg mq msgId' = atomically $ tryPeekMsg mq >>= \case msg_@(Just msg) - | msgId msg == msgId' || B.null msgId' -> tryDeleteMsg mq >> pure msg_ + | msgId msg == msgId' || B.null msgId' -> tryDeleteMsg_ mq >> pure msg_ | otherwise -> pure Nothing _ -> pure Nothing -- atomic delete (== read) last and peek next message if available -tryDelPeekMsg :: MsgQueue -> MsgId -> STM (Maybe Message, Maybe Message) -tryDelPeekMsg mq msgId' = +tryDelPeekMsg :: MsgQueue -> MsgId -> IO (Maybe Message, Maybe Message) +tryDelPeekMsg mq msgId' = atomically $ tryPeekMsg mq >>= \case msg_@(Just msg) - | msgId msg == msgId' || B.null msgId' -> (msg_,) <$> (tryDeleteMsg mq >> tryPeekMsg mq) + | msgId msg == msgId' || B.null msgId' -> (msg_,) <$> (tryDeleteMsg_ mq >> tryPeekMsg mq) | otherwise -> pure (Nothing, msg_) _ -> pure (Nothing, Nothing) -deleteExpiredMsgs :: MsgQueue -> Int64 -> STM Int -deleteExpiredMsgs mq old = loop 0 +deleteExpiredMsgs :: MsgQueue -> Int64 -> IO Int +deleteExpiredMsgs mq old = atomically $ loop 0 where loop dc = tryPeekMsg mq >>= \case Just Message {msgTs} | systemSeconds msgTs < old -> - tryDeleteMsg mq >> loop (dc + 1) + tryDeleteMsg_ mq >> loop (dc + 1) _ -> pure dc -tryDeleteMsg :: MsgQueue -> STM () -tryDeleteMsg MsgQueue {msgQueue = q, size} = +tryDeleteMsg_ :: MsgQueue -> STM () +tryDeleteMsg_ MsgQueue {msgQueue = q, size} = tryReadTQueue q >>= \case Just _ -> modifyTVar' size (subtract 1) _ -> pure () diff --git a/src/Simplex/Messaging/Server/QueueStore/STM.hs b/src/Simplex/Messaging/Server/QueueStore/STM.hs index 50907cf9a..3a1385269 100644 --- a/src/Simplex/Messaging/Server/QueueStore/STM.hs +++ b/src/Simplex/Messaging/Server/QueueStore/STM.hs @@ -45,8 +45,8 @@ newQueueStore = do notifiers <- TM.emptyIO pure QueueStore {queues, senders, notifiers} -addQueue :: QueueStore -> QueueRec -> STM (Either ErrorType ()) -addQueue QueueStore {queues, senders} q@QueueRec {recipientId = rId, senderId = sId} = do +addQueue :: QueueStore -> QueueRec -> IO (Either ErrorType ()) +addQueue QueueStore {queues, senders} q@QueueRec {recipientId = rId, senderId = sId} = atomically $ do ifM hasId (pure $ Left DUPLICATE_) $ do qVar <- newTVar q TM.insert rId qVar queues @@ -55,26 +55,26 @@ addQueue QueueStore {queues, senders} q@QueueRec {recipientId = rId, senderId = where hasId = (||) <$> TM.member rId queues <*> TM.member sId senders -getQueue :: DirectParty p => QueueStore -> SParty p -> QueueId -> STM (Either ErrorType QueueRec) +getQueue :: DirectParty p => QueueStore -> SParty p -> QueueId -> IO (Either ErrorType QueueRec) getQueue QueueStore {queues, senders, notifiers} party qId = - toResult <$> (mapM readTVar =<< getVar) + toResult <$> (mapM readTVarIO =<< getVar) where getVar = case party of - SRecipient -> TM.lookup qId queues - SSender -> TM.lookup qId senders $>>= (`TM.lookup` queues) - SNotifier -> TM.lookup qId notifiers $>>= (`TM.lookup` queues) + SRecipient -> TM.lookupIO qId queues + SSender -> TM.lookupIO qId senders $>>= (`TM.lookupIO` queues) + SNotifier -> TM.lookupIO qId notifiers $>>= (`TM.lookupIO` queues) -secureQueue :: QueueStore -> RecipientId -> SndPublicAuthKey -> STM (Either ErrorType QueueRec) +secureQueue :: QueueStore -> RecipientId -> SndPublicAuthKey -> IO (Either ErrorType QueueRec) secureQueue QueueStore {queues} rId sKey = - withQueue rId queues $ \qVar -> + atomically $ withQueue rId queues $ \qVar -> readTVar qVar >>= \q -> case senderKey q of Just k -> pure $ if sKey == k then Just q else Nothing _ -> let !q' = q {senderKey = Just sKey} in writeTVar qVar q' $> Just q' -addQueueNotifier :: QueueStore -> RecipientId -> NtfCreds -> STM (Either ErrorType QueueRec) -addQueueNotifier QueueStore {queues, notifiers} rId ntfCreds@NtfCreds {notifierId = nId} = do +addQueueNotifier :: QueueStore -> RecipientId -> NtfCreds -> IO (Either ErrorType QueueRec) +addQueueNotifier QueueStore {queues, notifiers} rId ntfCreds@NtfCreds {notifierId = nId} = atomically $ do ifM (TM.member nId notifiers) (pure $ Left DUPLICATE_) $ withQueue rId queues $ \qVar -> do q <- readTVar qVar @@ -83,20 +83,20 @@ addQueueNotifier QueueStore {queues, notifiers} rId ntfCreds@NtfCreds {notifierI TM.insert nId rId notifiers pure $ Just q -deleteQueueNotifier :: QueueStore -> RecipientId -> STM (Either ErrorType ()) +deleteQueueNotifier :: QueueStore -> RecipientId -> IO (Either ErrorType ()) deleteQueueNotifier QueueStore {queues, notifiers} rId = - withQueue rId queues $ \qVar -> do + atomically $ withQueue rId queues $ \qVar -> do q <- readTVar qVar forM_ (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId notifiers writeTVar qVar $! q {notifier = Nothing} pure $ Just () -suspendQueue :: QueueStore -> RecipientId -> STM (Either ErrorType ()) +suspendQueue :: QueueStore -> RecipientId -> IO (Either ErrorType ()) suspendQueue QueueStore {queues} rId = - withQueue rId queues $ \qVar -> modifyTVar' qVar (\q -> q {status = QueueOff}) $> Just () + atomically $ withQueue rId queues $ \qVar -> modifyTVar' qVar (\q -> q {status = QueueOff}) $> Just () -deleteQueue :: QueueStore -> RecipientId -> STM (Either ErrorType QueueRec) -deleteQueue QueueStore {queues, senders, notifiers} rId = do +deleteQueue :: QueueStore -> RecipientId -> IO (Either ErrorType QueueRec) +deleteQueue QueueStore {queues, senders, notifiers} rId = atomically $ do TM.lookupDelete rId queues >>= \case Just qVar -> readTVar qVar >>= \q -> do diff --git a/src/Simplex/Messaging/Transport/HTTP2.hs b/src/Simplex/Messaging/Transport/HTTP2.hs index 3b741e6ce..10522c5bc 100644 --- a/src/Simplex/Messaging/Transport/HTTP2.hs +++ b/src/Simplex/Messaging/Transport/HTTP2.hs @@ -2,7 +2,6 @@ module Simplex.Messaging.Transport.HTTP2 where -import Control.Concurrent.STM import qualified Control.Exception as E import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B